From edabd23e12d75058d7ce5d92e38dcc9f66f83c53 Mon Sep 17 00:00:00 2001 From: Filippo Neysofu Costa Date: Wed, 6 Mar 2024 17:28:26 +0100 Subject: [PATCH] Update repository content with the latest upstream changes (#1) * v2.0.0-rc.0 Signed-off-by: Filippo Costa * Fix Clippy lints Signed-off-by: Filippo Costa * Fix other Clippy lints Signed-off-by: Filippo Costa * Fix other Clippy lints Signed-off-by: Filippo Costa --------- Signed-off-by: Filippo Costa --- Cargo.toml | 8 +- rust-toolchain.toml | 3 + src/cache/cache_container.rs | 995 +++++++++++++++++++++++++++++++++++ src/cache/cache_db.rs | 581 ++++++++++++++++++++ src/cache/change_set.rs | 56 ++ src/cache/mod.rs | 8 + src/iterator.rs | 438 ++++++++++++++- src/lib.rs | 121 ++++- src/metrics.rs | 38 +- src/schema_batch.rs | 260 +++++++-- src/snapshot.rs | 508 ------------------ src/test.rs | 35 +- tests/iterator_test.rs | 81 ++- tests/snapshot_test.rs | 50 +- 14 files changed, 2522 insertions(+), 660 deletions(-) create mode 100644 rust-toolchain.toml create mode 100644 src/cache/cache_container.rs create mode 100644 src/cache/cache_db.rs create mode 100644 src/cache/change_set.rs create mode 100644 src/cache/mod.rs delete mode 100644 src/snapshot.rs diff --git a/Cargo.toml b/Cargo.toml index e571b09..b0692ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ readme = "README.md" anyhow = "1" byteorder = { version = "1", default-features = true, optional = true } once_cell = "1" -prometheus = { version = "0.13", optional = true } +prometheus = { version = "0.13" } proptest = { version = "1", optional = true } proptest-derive = { version = "0.4", optional = true } rocksdb = { version = "0.21" } @@ -30,7 +30,5 @@ tempfile = "3" [features] default = [] - -arbitrary = ["dep:proptest", "dep:proptest-derive"] -prometheus = ["dep:prometheus"] -test-utils = ["dep:byteorder"] +arbitrary = ["dep:proptest", "dep:proptest-derive", "rockbound/arbitrary"] +test-utils = ["dep:byteorder", "arbitrary"] diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..73cb934 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "stable" +components = ["rustfmt", "clippy"] diff --git a/src/cache/cache_container.rs b/src/cache/cache_container.rs new file mode 100644 index 0000000..3c8c80e --- /dev/null +++ b/src/cache/cache_container.rs @@ -0,0 +1,995 @@ +//! CacheContainer module responsible for correct traversal of cache layers. +use std::cmp::Ordering; +use std::collections::{btree_map, HashMap}; +use std::iter::{Peekable, Rev}; + +use crate::cache::cache_container::DataLocation::Snapshot; +use crate::cache::change_set::ChangeSet; +use crate::cache::SnapshotId; +use crate::iterator::ScanDirection; +use crate::schema::{KeyCodec, ValueCodec}; +use crate::{Operation, RawDbIter, ReadOnlyLock, Schema, SchemaKey, SchemaValue, DB}; + +/// Holds collection of [`ChangeSet`]'s associated with particular Snapshot +/// and knows how to traverse them. +/// Managed externally. +/// Should be managed carefully, because discrepancy between `snapshots` and `to_parent` leads to panic +/// Ideally owner of writable reference to parent nad owner of cache container manages both correctly. +#[derive(Debug)] +pub struct CacheContainer { + db: DB, + /// Set of [`ChangeSet`]s of data per individual database per snapshot + snapshots: HashMap, + /// Hierarchical + /// Shared between all CacheContainers and managed by StorageManager + to_parent: ReadOnlyLock>, +} + +impl CacheContainer { + /// Create CacheContainer pointing go given DB and Snapshot ID relations + pub fn new(db: DB, to_parent: ReadOnlyLock>) -> Self { + Self { + db, + snapshots: HashMap::new(), + to_parent, + } + } + + /// Create instance of snapshot manager, when it does not have connection to snapshots tree + /// So it only reads from database. + #[cfg(feature = "test-utils")] + pub fn orphan(db: DB) -> Self { + Self { + db, + snapshots: HashMap::new(), + to_parent: std::sync::Arc::new(std::sync::RwLock::new(Default::default())).into(), + } + } + + /// Adds Snapshot to the collection. + /// Please note that caller must update its own reference of `to_parent` + /// After adding snapshot. + /// Ideally these operations should be atomic + pub fn add_snapshot(&mut self, snapshot: ChangeSet) -> anyhow::Result<()> { + let snapshot_id = snapshot.id(); + if self.snapshots.contains_key(&snapshot_id) { + anyhow::bail!("Attempt to double save same snapshot id={}", snapshot_id); + } + self.snapshots.insert(snapshot_id, snapshot); + Ok(()) + } + + /// Removes snapshot from collection + /// This should happen **after** `to_parent` is updated + pub fn discard_snapshot(&mut self, snapshot_id: &SnapshotId) -> Option { + self.snapshots.remove(snapshot_id) + } + + /// Writes snapshot to the underlying database + /// Snapshot id should be removed from `to_parent` atomically. + pub fn commit_snapshot(&mut self, snapshot_id: &SnapshotId) -> anyhow::Result<()> { + let snapshot = self.snapshots.remove(snapshot_id).ok_or_else(|| { + anyhow::anyhow!("Attempt to commit unknown snapshot id={}", snapshot_id) + })?; + self.db.write_schemas(snapshot.into()) + } + + /// Indicates, if CacheContainer has any snapshots in memory. + /// Does not mean that underlying database is empty. + pub fn is_empty(&self) -> bool { + self.snapshots.is_empty() + } + + /// Helper method to check if snapshot with given ID has been added already + pub fn contains_snapshot(&self, snapshot_id: &SnapshotId) -> bool { + self.snapshots.contains_key(snapshot_id) + } + + pub(crate) fn get( + &self, + mut snapshot_id: SnapshotId, + key: &impl KeyCodec, + ) -> anyhow::Result> { + while let Some(parent_snapshot_id) = self.to_parent.read().unwrap().get(&snapshot_id) { + let parent_snapshot = self + .snapshots + .get(parent_snapshot_id) + .expect("Inconsistency between `self.snapshots` and `self.to_parent`: snapshot is missing in container"); + + // Some operation has been found + if let Some(operation) = parent_snapshot.get(key)? { + return match operation { + Operation::Put { value } => Ok(Some(S::Value::decode_value(value)?)), + Operation::Delete => Ok(None), + }; + } + + snapshot_id = *parent_snapshot_id; + } + self.db.get(key) + } + + fn snapshot_iterators( + &self, + mut snapshot_id: SnapshotId, + ) -> Vec> { + let mut snapshot_iterators = vec![]; + let to_parent = self.to_parent.read().unwrap(); + while let Some(parent_snapshot_id) = to_parent.get(&snapshot_id) { + let parent_snapshot = self + .snapshots + .get(parent_snapshot_id) + .expect("Inconsistency between `self.snapshots` and `self.to_parent`"); + + snapshot_iterators.push(parent_snapshot.iter::()); + + snapshot_id = *parent_snapshot_id; + } + + snapshot_iterators.reverse(); + snapshot_iterators + } + + // Allow this unused method, for future use + #[allow(dead_code)] + pub(crate) fn iter( + &self, + snapshot_id: SnapshotId, + ) -> anyhow::Result>> { + let snapshot_iterators = self.snapshot_iterators::(snapshot_id); + let db_iter = self.db.raw_iter::(ScanDirection::Forward)?; + + Ok(CacheContainerIter::new( + db_iter, + snapshot_iterators, + ScanDirection::Forward, + )) + } + + /// Returns iterator over keys in given [`Schema`] among all snapshots and DB in reverse lexicographical order + pub(crate) fn rev_iter( + &self, + snapshot_id: SnapshotId, + ) -> anyhow::Result>>> { + let snapshot_iterators = self + .snapshot_iterators::(snapshot_id) + .into_iter() + .map(Iterator::rev) + .collect(); + let db_iter = self.db.raw_iter::(ScanDirection::Backward)?; + + Ok(CacheContainerIter::new( + db_iter, + snapshot_iterators, + ScanDirection::Backward, + )) + } + + pub(crate) fn iter_range( + &self, + mut snapshot_id: SnapshotId, + range: impl std::ops::RangeBounds + Clone, + ) -> anyhow::Result>> { + let mut snapshot_iterators = vec![]; + let to_parent = self.to_parent.read().unwrap(); + while let Some(parent_snapshot_id) = to_parent.get(&snapshot_id) { + let parent_snapshot = self + .snapshots + .get(parent_snapshot_id) + .expect("Inconsistency between `self.snapshots` and `self.to_parent`"); + + snapshot_iterators.push(parent_snapshot.iter_range::(range.clone())); + + snapshot_id = *parent_snapshot_id; + } + + snapshot_iterators.reverse(); + + let db_iter = self.db.raw_iter_range::(range, ScanDirection::Forward)?; + Ok(CacheContainerIter::new( + db_iter, + snapshot_iterators, + ScanDirection::Forward, + )) + } + + pub(crate) fn rev_iter_range( + &self, + mut snapshot_id: SnapshotId, + range: impl std::ops::RangeBounds + Clone, + ) -> anyhow::Result>>> { + let mut snapshot_iterators = vec![]; + let to_parent = self.to_parent.read().unwrap(); + while let Some(parent_snapshot_id) = to_parent.get(&snapshot_id) { + let parent_snapshot = self + .snapshots + .get(parent_snapshot_id) + .expect("Inconsistency between `self.snapshots` and `self.to_parent`"); + + snapshot_iterators.push(parent_snapshot.iter_range::(range.clone()).rev()); + snapshot_id = *parent_snapshot_id; + } + + snapshot_iterators.reverse(); + let db_iter = self + .db + .raw_iter_range::(range, ScanDirection::Backward)?; + + Ok(CacheContainerIter::new( + db_iter, + snapshot_iterators, + ScanDirection::Backward, + )) + } +} + +/// [`Iterator`] over keys in given [`Schema`] in all snapshots in reverse +/// lexicographical order. +pub(crate) struct CacheContainerIter<'a, SnapshotIter> +where + SnapshotIter: Iterator, +{ + db_iter: Peekable>, + snapshot_iterators: Vec>, + next_value_locations: Vec, + direction: ScanDirection, +} + +impl<'a, SnapshotIter> CacheContainerIter<'a, SnapshotIter> +where + SnapshotIter: Iterator, +{ + fn new( + db_iter: RawDbIter<'a>, + snapshot_iterators: Vec, + direction: ScanDirection, + ) -> Self { + let max_values_size = snapshot_iterators.len().checked_add(1).unwrap_or_default(); + Self { + db_iter: db_iter.peekable(), + snapshot_iterators: snapshot_iterators + .into_iter() + .map(|iter| iter.peekable()) + .collect(), + next_value_locations: Vec::with_capacity(max_values_size), + direction, + } + } +} + +#[derive(Debug)] +enum DataLocation { + Db, + // Index inside `snapshot_iterators` + Snapshot(usize), +} + +impl<'a, SnapshotIter> Iterator for CacheContainerIter<'a, SnapshotIter> +where + SnapshotIter: Iterator, +{ + type Item = (SchemaKey, SchemaValue); + + fn next(&mut self) -> Option { + loop { + let mut next_value: Option<&SchemaKey> = None; + self.next_value_locations.clear(); + if let Some((db_key, _)) = self.db_iter.peek() { + self.next_value_locations.push(DataLocation::Db); + next_value = Some(db_key); + }; + + for (idx, iter) in self.snapshot_iterators.iter_mut().enumerate() { + if let Some(&(peeked_key, _)) = iter.peek() { + match next_value { + None => { + self.next_value_locations.push(Snapshot(idx)); + next_value = Some(peeked_key); + } + Some(next_key) => match (&self.direction, peeked_key.cmp(next_key)) { + (ScanDirection::Backward, Ordering::Greater) + | (ScanDirection::Forward, Ordering::Less) => { + next_value = Some(peeked_key); + self.next_value_locations.clear(); + self.next_value_locations.push(Snapshot(idx)); + } + (_, Ordering::Equal) => { + self.next_value_locations.push(Snapshot(idx)); + } + _ => {} + }, + }; + } + } + + // Rightmost location is from the latest snapshot, so it has priority over all past snapshot/DB + if let Some(latest_next_location) = self.next_value_locations.pop() { + // Move all iterators to next value + for location in &self.next_value_locations { + match location { + DataLocation::Db => { + let _ = self.db_iter.next().unwrap(); + } + Snapshot(idx) => { + let _ = self.snapshot_iterators[*idx].next().unwrap(); + } + } + } + + // Handle next value + match latest_next_location { + DataLocation::Db => { + let (key, value) = self.db_iter.next().unwrap(); + return Some((key, value)); + } + Snapshot(idx) => { + let (key, operation) = self.snapshot_iterators[idx].next().unwrap(); + match operation { + Operation::Put { value } => { + return Some((key.to_vec(), value.to_vec())) + } + Operation::Delete => continue, + } + } + }; + } else { + break; + } + } + + None + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::{Arc, RwLock, RwLockReadGuard}; + + use proptest::prelude::*; + + use crate::cache::cache_container::{CacheContainer, CacheContainerIter}; + use crate::cache::cache_db::CacheDb; + use crate::iterator::ScanDirection; + use crate::schema::{KeyDecoder, KeyEncoder, Schema, ValueCodec}; + use crate::test::TestField; + use crate::{define_schema, Operation, SchemaBatch, SchemaKey, SchemaValue, DB}; + + const DUMMY_STATE_CF: &str = "DummyStateCF"; + + define_schema!(DummyStateSchema, TestField, TestField, DUMMY_STATE_CF); + type S = DummyStateSchema; + + fn create_test_db(path: &std::path::Path) -> DB { + let tables = vec![DUMMY_STATE_CF.to_string()]; + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + DB::open(path, "test_db", tables, &db_opts).unwrap() + } + + #[test] + fn test_empty() { + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let snapshot_manager = + CacheContainer::new(db, Arc::new(RwLock::new(HashMap::new())).into()); + assert!(snapshot_manager.is_empty()); + } + + #[test] + fn test_add_and_discard_snapshot() { + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); + + let snapshot_id = 1; + let db_snapshot = CacheDb::new(snapshot_id, snapshot_manager.clone().into()); + + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + assert!(!snapshot_manager.is_empty()); + snapshot_manager.discard_snapshot(&snapshot_id); + assert!(snapshot_manager.is_empty()); + } + } + + #[test] + #[should_panic(expected = "Attempt to double save same snapshot")] + fn test_add_twice() { + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); + + let snapshot_id = 1; + // Both share the same ID + let db_snapshot_1 = CacheDb::new(snapshot_id, snapshot_manager.clone().into()); + let db_snapshot_2 = CacheDb::new(snapshot_id, snapshot_manager.clone().into()); + + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot_1.into()).unwrap(); + assert!(!snapshot_manager.is_empty()); + snapshot_manager.add_snapshot(db_snapshot_2.into()).unwrap(); + } + } + + #[test] + #[should_panic(expected = "Attempt to commit unknown snapshot")] + fn test_commit_unknown() { + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + let mut cache_container = CacheContainer::new(db, to_parent.into()); + + cache_container.commit_snapshot(&1).unwrap(); + } + + #[test] + fn test_discard_unknown() { + // Discarding unknown snapshots are fine. + // As it possible that caller didn't save it previously. + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + let mut cache_container = CacheContainer::new(db, to_parent.into()); + + cache_container.discard_snapshot(&1); + } + + #[test] + fn test_commit_snapshot() { + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); + + let snapshot_id = 1; + let db_snapshot = CacheDb::new(snapshot_id, snapshot_manager.clone().into()); + + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + let result = snapshot_manager.commit_snapshot(&snapshot_id); + assert!(result.is_ok()); + assert!(snapshot_manager.is_empty()); + } + } + + #[test] + fn test_query_unknown_snapshot_id() { + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + let snapshot_manager = CacheContainer::new(db, to_parent.into()); + assert_eq!(None, snapshot_manager.get::(1, &TestField(1)).unwrap()); + } + + #[test] + fn test_query_genesis_snapshot() { + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + + let one = TestField(1); + let two = TestField(2); + let three = TestField(3); + + let mut db_data = SchemaBatch::new(); + db_data.put::(&one, &one).unwrap(); + db_data.put::(&three, &three).unwrap(); + db.write_schemas(db_data).unwrap(); + + let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); + + let db_snapshot = CacheDb::new(1, snapshot_manager.clone().into()); + db_snapshot.put::(&two, &two).unwrap(); + db_snapshot.delete::(&three).unwrap(); + + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + + // Effectively querying database: + assert_eq!(Some(one), snapshot_manager.get::(1, &one).unwrap()); + assert_eq!(None, snapshot_manager.get::(1, &two).unwrap()); + assert_eq!(Some(three), snapshot_manager.get::(1, &three).unwrap()); + } + } + + #[test] + fn test_query_lifecycle() { + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + { + // / -> 6 -> 7 + // DB -> 1 -> 2 -> 3 + // \ -> 4 -> 5 + let mut edit = to_parent.write().unwrap(); + edit.insert(3, 2); + edit.insert(2, 1); + edit.insert(4, 1); + edit.insert(5, 4); + edit.insert(6, 2); + edit.insert(7, 6); + } + + let f1 = TestField(1); + let f2 = TestField(2); + let f3 = TestField(3); + let f4 = TestField(4); + let f5 = TestField(5); + let f6 = TestField(6); + let f7 = TestField(7); + let f8 = TestField(8); + + let mut db_data = SchemaBatch::new(); + db_data.put::(&f1, &f1).unwrap(); + db.write_schemas(db_data).unwrap(); + + let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); + + // Operations: + // | snapshot_id | key | operation | + // | DB | 1 | write(1) | + // | 1 | 2 | write(2) | + // | 1 | 3 | write(4) | + // | 2 | 1 | write(5) | + // | 2 | 2 | delete | + // | 4 | 3 | write(6) | + // | 6 | 1 | write(7) | + // | 6 | 2 | write(8) | + + // 1 + let db_snapshot = CacheDb::new(1, snapshot_manager.clone().into()); + db_snapshot.put::(&f2, &f2).unwrap(); + db_snapshot.put::(&f3, &f4).unwrap(); + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + } + + // 2 + let db_snapshot = CacheDb::new(2, snapshot_manager.clone().into()); + db_snapshot.put::(&f1, &f5).unwrap(); + db_snapshot.delete::(&f2).unwrap(); + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + } + + // 3 + let db_snapshot = CacheDb::new(3, snapshot_manager.clone().into()); + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + } + + // 4 + let db_snapshot = CacheDb::new(4, snapshot_manager.clone().into()); + db_snapshot.put::(&f3, &f6).unwrap(); + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + } + + // 5 + let db_snapshot = CacheDb::new(5, snapshot_manager.clone().into()); + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + } + + // 6 + let db_snapshot = CacheDb::new(6, snapshot_manager.clone().into()); + db_snapshot.put::(&f1, &f7).unwrap(); + db_snapshot.put::(&f2, &f8).unwrap(); + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + } + + // 7 + let db_snapshot = CacheDb::new(7, snapshot_manager.clone().into()); + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + } + + // View: + // | from s_id | key | value | + // | 3 | 1 | 5 | + // | 3 | 2 | None | + // | 3 | 3 | 4 | + // | 5 | 1 | 1 | + // | 5 | 2 | 2 | + // | 5 | 3 | 6 | + // | 7 | 1 | 7 | + // | 7 | 2 | 8 | + // | 7 | 3 | 4 | + let snapshot_manager = snapshot_manager.read().unwrap(); + assert_eq!(Some(f5), snapshot_manager.get::(3, &f1).unwrap()); + assert_eq!(None, snapshot_manager.get::(3, &f2).unwrap()); + assert_eq!(Some(f4), snapshot_manager.get::(3, &f3).unwrap()); + assert_eq!(Some(f1), snapshot_manager.get::(5, &f1).unwrap()); + assert_eq!(Some(f2), snapshot_manager.get::(5, &f2).unwrap()); + assert_eq!(Some(f6), snapshot_manager.get::(5, &f3).unwrap()); + + assert_eq!(Some(f7), snapshot_manager.get::(7, &f1).unwrap()); + assert_eq!(Some(f8), snapshot_manager.get::(7, &f2).unwrap()); + assert_eq!(Some(f4), snapshot_manager.get::(7, &f3).unwrap()); + } + + fn collect_actual_values<'a, I: Iterator>( + iterator: CacheContainerIter<'a, I>, + ) -> Vec<(TestField, TestField)> { + iterator + .into_iter() + .map(|(k, v)| { + let key = <::Key as KeyDecoder>::decode_key(&k).unwrap(); + let value = <::Value as ValueCodec>::decode_value(&v).unwrap(); + (key, value) + }) + .collect() + } + + fn encode_key(field: &TestField) -> SchemaKey { + >::encode_key(field).unwrap() + } + + #[test] + fn test_iterator() { + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + { + // DB -> 1 -> 2 -> 3 + let mut edit = to_parent.write().unwrap(); + edit.insert(2, 1); + edit.insert(3, 2); + edit.insert(4, 3); + } + + let f0 = TestField(0); + let f1 = TestField(1); + let f2 = TestField(2); + let f3 = TestField(3); + let f4 = TestField(4); + let f5 = TestField(5); + let f6 = TestField(6); + let f7 = TestField(7); + let f8 = TestField(8); + let f9 = TestField(9); + let f10 = TestField(10); + let f11 = TestField(11); + let f12 = TestField(12); + + let mut db_data = SchemaBatch::new(); + db_data.put::(&f3, &f9).unwrap(); + db_data.put::(&f2, &f1).unwrap(); + db_data.put::(&f4, &f1).unwrap(); + db_data.put::(&f0, &f1).unwrap(); + db_data.put::(&f11, &f9).unwrap(); + db.write_schemas(db_data).unwrap(); + // DB Data + // | key | value | + // | 3 | 9 | + // | 2 | 1 | + // | 4 | 1 | + // | 0 | 1 | + // | 11 | 9 | + + let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); + + // Operations: + // | snapshot_id | key | operation | + // | 1 | 1 | write(8) | + // | 1 | 5 | write(7) | + // | 1 | 8 | write(3) | + // | 1 | 4 | write(2) | + // | 2 | 10 | write(2) | + // | 2 | 9 | write(4) | + // | 2 | 4 | delete | + // | 2 | 2 | write(6) | + // | 3 | 8 | write(6) | + // | 3 | 9 | delete | + // | 3 | 12 | write(1) | + // | 3 | 1 | write(2) | + + // 1 + let db_snapshot = CacheDb::new(1, snapshot_manager.clone().into()); + db_snapshot.put::(&f1, &f8).unwrap(); + db_snapshot.put::(&f5, &f7).unwrap(); + db_snapshot.put::(&f8, &f3).unwrap(); + db_snapshot.put::(&f4, &f2).unwrap(); + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + } + + // 2 + let db_snapshot = CacheDb::new(2, snapshot_manager.clone().into()); + db_snapshot.put::(&f10, &f2).unwrap(); + db_snapshot.put::(&f9, &f4).unwrap(); + db_snapshot.delete::(&f4).unwrap(); + db_snapshot.put::(&f2, &f6).unwrap(); + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + } + + // 3 + let db_snapshot = CacheDb::new(3, snapshot_manager.clone().into()); + db_snapshot.put::(&f8, &f6).unwrap(); + db_snapshot.delete::(&f9).unwrap(); + db_snapshot.put::(&f12, &f1).unwrap(); + db_snapshot.put::(&f1, &f2).unwrap(); + { + let mut snapshot_manager = snapshot_manager.write().unwrap(); + snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); + } + + // Expected Order + // | key | value | + // | 12 | 1 | + // | 10 | 2 | + // | 8 | 6 | + // | 5 | 7 | + // | 3 | 9 | + // | 2 | 6 | + // | 1 | 2 | + + let snapshot_manager = snapshot_manager.read().unwrap(); + + // Full iterator + { + let mut expected_fields = vec![ + (f0, f1), + (f1, f2), + (f2, f6), + (f3, f9), + (f5, f7), + (f8, f6), + (f10, f2), + (f11, f9), + (f12, f1), + ]; + + let forward_iterator = snapshot_manager.iter::(4).unwrap(); + let actual_fields = collect_actual_values(forward_iterator); + + assert_eq!( + expected_fields, actual_fields, + "forward iterator is incorrect" + ); + + expected_fields.reverse(); + + let backward_iterator = snapshot_manager.rev_iter::(4).unwrap(); + let actual_fields = collect_actual_values(backward_iterator); + assert_eq!( + expected_fields, actual_fields, + "backward iterator is incorrect" + ); + } + + // Range iterator + { + let lower_bound = encode_key(&f2); + let upper_bound = encode_key(&f10); + + // Full range + let mut expected_fields = vec![(f2, f6), (f3, f9), (f5, f7), (f8, f6)]; + + let forward_iterator = snapshot_manager + .iter_range::(4, lower_bound.clone()..upper_bound.clone()) + .unwrap(); + let actual_fields = collect_actual_values(forward_iterator); + assert_eq!( + expected_fields, actual_fields, + "forward full range iterator is incorrect" + ); + let backward_iterator = snapshot_manager + .rev_iter_range::(4, lower_bound.clone()..upper_bound.clone()) + .unwrap(); + let actual_fields = collect_actual_values(backward_iterator); + expected_fields.reverse(); + assert_eq!( + expected_fields, actual_fields, + "backward full range iterator is incorrect" + ); + + // Only lower bound + let mut expected_fields = vec![ + (f2, f6), + (f3, f9), + (f5, f7), + (f8, f6), + (f10, f2), + (f11, f9), + (f12, f1), + ]; + let forward_iterator = snapshot_manager + .iter_range::(4, lower_bound.clone()..) + .unwrap(); + let actual_fields = collect_actual_values(forward_iterator); + assert_eq!( + expected_fields, actual_fields, + "forward low range iterator is incorrect" + ); + let backward_iterator = snapshot_manager + .rev_iter_range::(4, lower_bound..) + .unwrap(); + let actual_fields = collect_actual_values(backward_iterator); + expected_fields.reverse(); + assert_eq!( + expected_fields, actual_fields, + "backward low range iterator is incorrect" + ); + + // Only upper bound + let mut expected_fields = + vec![(f0, f1), (f1, f2), (f2, f6), (f3, f9), (f5, f7), (f8, f6)]; + let forward_iterator = snapshot_manager + .iter_range::(4, ..upper_bound.clone()) + .unwrap(); + let actual_fields = collect_actual_values(forward_iterator); + assert_eq!( + expected_fields, actual_fields, + "forward high range iterator is incorrect" + ); + let backward_iterator = snapshot_manager + .rev_iter_range::(4, ..upper_bound) + .unwrap(); + let actual_fields = collect_actual_values(backward_iterator); + expected_fields.reverse(); + assert_eq!( + expected_fields, actual_fields, + "backward high range iterator is incorrect" + ); + } + } + + fn test_iterator_ranges(numbers: Vec) { + // Spreads all numbers into 10 changesets, where first 3 are saved to the database + // Each number should be above 3 and below u32::MAX - 3 + assert!(numbers.len() >= 100); + + // Some numbers for ranges + let min = *numbers.iter().min().unwrap(); + let below_min = min + .checked_sub(3) + .expect("test input is not in defined range"); + let max = *numbers.iter().max().unwrap(); + let above_max = max + .checked_add(3) + .expect("test input is not in defined range"); + let middle = *numbers.get(numbers.len() / 2).unwrap(); + + let range_pairs = vec![ + (min, middle), + (below_min, middle), + (middle, max), + (middle, above_max), + (min, max), + (below_min, above_max), + ]; + + // Prepare cache container + let tempdir = tempfile::tempdir().unwrap(); + let db = create_test_db(tempdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + { + let mut to_parent = to_parent.write().unwrap(); + for id in 4..=10 { + to_parent.insert(id, id - 1); + } + } + + let cache_container = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); + + // Creating snapshots + let n_chunks = 10; // Number of chunks you want + + let chunk_size = (numbers.len() + n_chunks - 1) / n_chunks; // Calculate size of each chunk + let chunks: Vec<_> = numbers.chunks(chunk_size).map(|c| c.to_vec()).collect(); + + for (idx, chunk) in chunks.iter().enumerate() { + let snapshot_id = idx as u64; + let db_snapshot = CacheDb::new(snapshot_id, cache_container.clone().into()); + for item in chunk { + db_snapshot + .put::(&TestField(*item), &TestField(*item)) + .unwrap(); + } + + { + let mut cache_container = cache_container.write().unwrap(); + cache_container.add_snapshot(db_snapshot.into()).unwrap(); + if idx < 3 { + cache_container.commit_snapshot(&snapshot_id).unwrap(); + } + } + } + + for (low, high) in range_pairs { + let low = TestField(low); + let high = TestField(high); + let range = encode_key(&low)..encode_key(&high); + let range_inclusive = encode_key(&low)..=encode_key(&high); + let range_to = ..encode_key(&high); + let range_to_inclusive = ..=encode_key(&high); + let range_from = encode_key(&low)..; + let range_full = ..; + + check_range(cache_container.read().unwrap(), range); + check_range(cache_container.read().unwrap(), range_inclusive); + check_range(cache_container.read().unwrap(), range_to); + check_range(cache_container.read().unwrap(), range_to_inclusive); + check_range(cache_container.read().unwrap(), range_from); + check_range(cache_container.read().unwrap(), range_full); + } + } + + fn check_range + Clone>( + cache_container: RwLockReadGuard, + range: R, + ) { + let iterator_forward = cache_container.iter_range::(10, range.clone()).unwrap(); + validate_iterator(iterator_forward, range.clone(), ScanDirection::Forward); + let iterator_backward = cache_container + .rev_iter_range::(10, range.clone()) + .unwrap(); + validate_iterator(iterator_backward, range, ScanDirection::Backward); + } + + fn validate_iterator(iterator: I, range: R, direction: ScanDirection) + where + I: Iterator, + R: std::ops::RangeBounds, + { + let mut prev_key: Option = None; + for (key, _) in iterator { + assert!(range.contains(&key)); + let key = <::Key as KeyDecoder>::decode_key(&key).unwrap(); + if let Some(prev_key) = prev_key { + match direction { + ScanDirection::Forward => { + assert!(key.0 >= prev_key.0) + } + ScanDirection::Backward => { + assert!(key.0 <= prev_key.0) + } + }; + } + prev_key = Some(key); + } + } + + #[test] + fn check_proptest_case() { + let numbers: Vec = (10..=113).collect(); + test_iterator_ranges(numbers); + } + + proptest! { + #[test] + fn cache_container_iter_range(input in prop::collection::vec(4u32..10_000, 101..200)) { + test_iterator_ranges(input); + } + + #[test] + fn cache_container_iter_range_tight_values(input in prop::collection::vec(4u32..10, 101..200)) { + test_iterator_ranges(input); + } + + fn cache_container_iter_range_uniq_numbers(input in prop::collection::hash_set(4u32..10_000, 101..200)) { + let input: Vec = input.into_iter().collect(); + test_iterator_ranges(input); + } + } +} diff --git a/src/cache/cache_db.rs b/src/cache/cache_db.rs new file mode 100644 index 0000000..67a5a60 --- /dev/null +++ b/src/cache/cache_db.rs @@ -0,0 +1,581 @@ +//! This module is main entry point into cache layering subsystem. +use std::iter::Peekable; +use std::sync::Mutex; + +use crate::cache::cache_container::CacheContainer; +use crate::cache::change_set::ChangeSet; +use crate::cache::SnapshotId; +use crate::iterator::ScanDirection; +use crate::schema::KeyDecoder; +use crate::{ + KeyCodec, Operation, ReadOnlyLock, Schema, SchemaBatch, SchemaKey, SchemaValue, SeekKeyEncoder, + ValueCodec, +}; + +/// Cache layer that stores all writes locally and also able to access "previous" operations. +#[derive(Debug)] +pub struct CacheDb { + local_cache: Mutex, + db: ReadOnlyLock, +} + +/// Response for a paginated query which also includes the "next" key to pass +#[derive(Debug)] +pub struct PaginatedResponse { + /// A vector of storage keys and their values + pub key_value: Vec<(S::Key, S::Value)>, + /// Key indicating the first key after the final pair from key_value. + /// Meant to be passed in in subsequent queries + pub next: Option, +} + +impl CacheDb { + /// Create new [`CacheDb`] pointing to given [`CacheContainer`] + pub fn new(id: SnapshotId, cache_container: ReadOnlyLock) -> Self { + Self { + local_cache: Mutex::new(ChangeSet::new(id)), + db: cache_container, + } + } + + /// Store a value in local cache + pub fn put( + &self, + key: &impl KeyCodec, + value: &impl ValueCodec, + ) -> anyhow::Result<()> { + self.local_cache + .lock() + .expect("Local ChangeSet lock must not be poisoned") + .operations + .put(key, value) + } + + /// Delete given key from local cache + pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { + self.local_cache + .lock() + .expect("Local ChangeSet lock must not be poisoned") + .operations + .delete(key) + } + + /// Writes many operations at once in local cache, atomically. + pub fn write_many(&self, batch: SchemaBatch) -> anyhow::Result<()> { + let mut inner = self + .local_cache + .lock() + .expect("Local SchemaBatch lock must not be poisoned"); + inner.operations.merge(batch); + Ok(()) + } + + /// Overwrites inner cache with new, while retaining reference to parent + pub fn overwrite_change_set(&self, other: CacheDb) { + let mut this_cache = self.local_cache.lock().unwrap(); + let other_cache = other.local_cache.into_inner().unwrap(); + *this_cache = other_cache; + } + + /// Get a value from current snapshot, its parents or underlying database + pub fn read(&self, key: &impl KeyCodec) -> anyhow::Result> { + // Some(Operation) means that key was touched, + // but in case of deletion we early return None + // Only in case of not finding operation for key, + // we go deeper + + // Hold local cache lock explicitly, so reads are atomic + let local_cache = self + .local_cache + .lock() + .expect("SchemaBatch lock should not be poisoned"); + + // 1. Check in cache + if let Some(operation) = local_cache.get(key)? { + return operation.decode_value::(); + } + + // 2. Check parent + let parent = self.db.read().expect("Parent lock must not be poisoned"); + parent.get::(local_cache.id(), key) + } + + /// Get value of largest key written value for given [`Schema`] + pub fn get_largest(&self) -> anyhow::Result> { + let change_set = self + .local_cache + .lock() + .expect("SchemaBatch lock must not be poisoned"); + let local_cache_iter = change_set.iter::().rev(); + + let parent = self.db.read().expect("Parent lock must not be poisoned"); + + let parent_iter = parent.rev_iter::(change_set.id())?; + + let mut combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { + local_cache_iter: local_cache_iter.peekable(), + parent_iter: parent_iter.peekable(), + direction: ScanDirection::Backward, + }; + + if let Some((key, value)) = combined_iter.next() { + let key = S::Key::decode_key(&key)?; + let value = S::Value::decode_value(&value)?; + return Ok(Some((key, value))); + } + + Ok(None) + } + + /// Get largest value in [`Schema`] that is smaller than give `seek_key` + pub fn get_prev( + &self, + seek_key: &impl SeekKeyEncoder, + ) -> anyhow::Result> { + let seek_key = seek_key.encode_seek_key()?; + let range = ..=seek_key; + let change_set = self + .local_cache + .lock() + .expect("Local cache lock must not be poisoned"); + let local_cache_iter = change_set.iter_range::(range.clone()).rev(); + + let parent = self + .db + .read() + .expect("Parent snapshots lock must not be poisoned"); + let parent_iter = parent.rev_iter_range::(change_set.id(), range)?; + + let mut combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { + local_cache_iter: local_cache_iter.peekable(), + parent_iter: parent_iter.peekable(), + direction: ScanDirection::Backward, + }; + + if let Some((key, value)) = combined_iter.next() { + let key = S::Key::decode_key(&key)?; + let value = S::Value::decode_value(&value)?; + return Ok(Some((key, value))); + } + Ok(None) + } + + /// Get `n` keys >= `seek_key` + pub fn get_n_from_first_match( + &self, + seek_key: &impl SeekKeyEncoder, + n: usize, + ) -> anyhow::Result> { + let seek_key = seek_key.encode_seek_key()?; + let range = seek_key..; + let change_set = self + .local_cache + .lock() + .expect("Local cache lock must not be poisoned"); + let local_cache_iter = change_set.iter_range::(range.clone()); + + let parent = self + .db + .read() + .expect("Parent snapshots lock must not be poisoned"); + let parent_iter = parent.iter_range::(change_set.id(), range)?; + + let mut combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { + local_cache_iter: local_cache_iter.peekable(), + parent_iter: parent_iter.peekable(), + direction: ScanDirection::Forward, + }; + + let results: Vec<(S::Key, S::Value)> = combined_iter + .by_ref() + .filter_map(|(key_bytes, value_bytes)| { + let key = S::Key::decode_key(&key_bytes).ok()?; + let value = S::Value::decode_value(&value_bytes).ok()?; + Some((key, value)) + }) + .take(n) + .collect(); + + let next_start_key: Option = combined_iter + .next() + .and_then(|(key_bytes, _)| S::Key::decode_key(&key_bytes).ok()); + + Ok(PaginatedResponse { + key_value: results, + next: next_start_key, + }) + } + + /// Get a clone of internal ChangeSet + pub fn clone_change_set(&self) -> ChangeSet { + let change_set = self + .local_cache + .lock() + .expect("Local change set lock is poisoned"); + change_set.clone() + } + + /// Collects all key-value pairs in given range, from smallest to largest. + pub fn collect_in_range>( + &self, + range: std::ops::Range, + ) -> anyhow::Result> { + let lower_bound = range.start.encode_seek_key()?; + let upper_bound = range.end.encode_seek_key()?; + let range = lower_bound..upper_bound; + let change_set = self + .local_cache + .lock() + .expect("Local cache lock must not be poisoned"); + let local_cache_iter = change_set.iter_range::(range.clone()); + + let parent = self + .db + .read() + .expect("Parent snapshots lock must not be poisoned"); + let parent_iter = parent.iter_range::(change_set.id(), range)?; + + let combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { + local_cache_iter: local_cache_iter.peekable(), + parent_iter: parent_iter.peekable(), + direction: ScanDirection::Forward, + }; + + let result = combined_iter + .map(|(key, value)| { + let key = S::Key::decode_key(&key).unwrap(); + let value = S::Value::decode_value(&value).unwrap(); + (key, value) + }) + .collect(); + + Ok(result) + } +} + +/// Iterator over [`CacheDb`] that combines local cache and parent iterators. +/// Prioritizes local cache over parent in case of equal keys. +/// This is because local cache operations are considered to be newer. +struct CacheDbIter<'a, LocalIter, ParentIter> +where + LocalIter: Iterator, + ParentIter: Iterator, +{ + local_cache_iter: Peekable, + parent_iter: Peekable, + direction: ScanDirection, +} + +impl<'a, LocalIter, ParentIter> Iterator for CacheDbIter<'a, LocalIter, ParentIter> +where + LocalIter: Iterator, + ParentIter: Iterator, +{ + type Item = (SchemaKey, SchemaValue); + + fn next(&mut self) -> Option { + loop { + let Some(&local) = self.local_cache_iter.peek() else { + // Local iterator is exhausted; all reads go to the parent. + return self.parent_iter.next(); + }; + + if let Some(parent) = self.parent_iter.peek() { + // Check if the next parent key has priority over the local key, based on + // the scan direction. + if ((local.0 > &parent.0) && (self.direction == ScanDirection::Forward)) + || ((local.0 < &parent.0) && (self.direction == ScanDirection::Backward)) + { + return self.parent_iter.next(); + } else if local.0 == &parent.0 { + // If keys are equal, we must consume the next parent value + // before we can read from the local cache (or they will + // get out of sync). + self.parent_iter.next(); + } + } + + // At this point, we've determined that we wish to read from the + // local cache. + self.local_cache_iter.next(); + match local.1 { + // No value to read from the local cache, skip it. + Operation::Delete => continue, + Operation::Put { value } => return Some((local.0.clone(), value.clone())), + } + } + } +} + +impl From for ChangeSet { + fn from(value: CacheDb) -> Self { + value + .local_cache + .into_inner() + .expect("Internal cache lock is poisoned") + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::path::Path; + use std::sync::{Arc, RwLock}; + + use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; + + use super::*; + use crate::cache::cache_db::CacheContainer; + use crate::schema::KeyEncoder; + use crate::test::{TestCompositeField, TestField}; + use crate::{define_schema, DB}; + define_schema!(TestSchema, TestCompositeField, TestField, "TestCF"); + + mod local_cache { + use super::*; + + #[test] + fn test_concurrent_operations() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(tmpdir.path()); + + let cache_container = CacheContainer::orphan(db); + + let cache_db = Arc::new(CacheDb::new( + 0, + Arc::new(RwLock::new(cache_container)).into(), + )); + + let mut handles = vec![]; + for i in 0..10 { + let cache_db_clone = cache_db.clone(); + let handle = std::thread::spawn(move || { + for j in 0..10 { + let key = TestCompositeField(j + 1, j + 2, j + 3); + let value = TestField(i * 10 + j + 4); + cache_db_clone.put::(&key, &value).unwrap(); + } + }); + handles.push(handle); + } + + for handle in handles { + handle.join().expect("Thread panicked"); + } + for j in 0..10 { + let key = TestCompositeField(j + 1, j + 2, j + 3); + let value = cache_db.read::(&key).unwrap().unwrap().0; + let possible_range = (j + 4)..(100 + j + 4); + assert!(possible_range.contains(&value)); + let rem = value - (j + 4); + assert_eq!(0, rem % 10); + } + } + } + + fn encode_key(key: &TestCompositeField) -> SchemaKey { + >::encode_key(key).unwrap() + } + + fn encode_value(value: &TestField) -> SchemaValue { + >::encode_value(value).unwrap() + } + + fn open_db(dir: impl AsRef) -> DB { + let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME, TestSchema::COLUMN_FAMILY_NAME]; + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + DB::open(dir, "test", column_families, &db_opts).expect("Failed to open DB.") + } + + #[test] + fn test_db_snapshot_iterator_empty() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(tmpdir.path()); + + let local_cache = SchemaBatch::new(); + let local_cache_iter = local_cache.iter::().peekable(); + + let cache_container = CacheContainer::orphan(db); + let parent_iter = cache_container.iter::(0).unwrap().peekable(); + + let combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { + local_cache_iter, + parent_iter, + direction: ScanDirection::Forward, + }; + + let values: Vec<(SchemaKey, SchemaValue)> = combined_iter.collect(); + assert!(values.is_empty()) + } + + #[test] + fn test_db_snapshot_iterator_values() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(tmpdir.path()); + + let k0 = TestCompositeField(0, 0, 0); + let k1 = TestCompositeField(0, 1, 0); + let k2 = TestCompositeField(0, 1, 2); + let k3 = TestCompositeField(3, 1, 0); + let k4 = TestCompositeField(3, 2, 0); + + let mut older_change_set = ChangeSet::new(0); + older_change_set + .operations + .put::(&k2, &TestField(2)) + .unwrap(); + older_change_set + .operations + .put::(&k1, &TestField(1)) + .unwrap(); + older_change_set + .operations + .put::(&k4, &TestField(4)) + .unwrap(); + older_change_set + .operations + .put::(&k3, &TestField(3)) + .unwrap(); + + let to_parent: Arc>> = + Arc::new(RwLock::new(Default::default())); + { + let mut parent = to_parent.write().unwrap(); + parent.insert(1, 0); + } + let mut cache_container = CacheContainer::new(db, to_parent.into()); + cache_container.add_snapshot(older_change_set).unwrap(); + + let parent_iter = cache_container.iter::(1).unwrap(); + + let mut local_change_set = ChangeSet::new(1); + + local_change_set + .operations + .delete::(&k3) + .unwrap(); + local_change_set + .operations + .put::(&k0, &TestField(100)) + .unwrap(); + local_change_set + .operations + .put::(&k1, &TestField(10)) + .unwrap(); + local_change_set + .operations + .put::(&k2, &TestField(20)) + .unwrap(); + + let local_cache_iter = local_change_set.iter::(); + + let combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { + local_cache_iter: local_cache_iter.peekable(), + parent_iter: parent_iter.peekable(), + direction: ScanDirection::Forward, + }; + + let actual_values: Vec<(SchemaKey, SchemaValue)> = combined_iter.collect(); + let expected_values = vec![ + (encode_key(&k0), encode_value(&TestField(100))), + (encode_key(&k1), encode_value(&TestField(10))), + (encode_key(&k2), encode_value(&TestField(20))), + (encode_key(&k4), encode_value(&TestField(4))), + ]; + + assert_eq!(expected_values, actual_values); + } + + fn put_value(cache_db: &CacheDb, key: u32, value: u32) { + cache_db + .put::(&TestCompositeField(key, 0, 0), &TestField(value)) + .unwrap(); + } + + fn check_value(cache_db: &CacheDb, key: u32, expected_value: Option) { + let actual_value = cache_db + .read::(&TestCompositeField(key, 0, 0)) + .unwrap() + .map(|v| v.0); + assert_eq!(expected_value, actual_value); + } + + #[test] + fn test_iterators_complete() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(tmpdir.path()); + + let to_parent: Arc>> = + Arc::new(RwLock::new(Default::default())); + { + let mut parent = to_parent.write().unwrap(); + parent.insert(2, 1); + parent.insert(3, 2); + parent.insert(4, 3); + } + + let cache_container = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); + + let db_values = CacheDb::new(0, cache_container.clone().into()); + + put_value(&db_values, 2, 1); + put_value(&db_values, 4, 1); + put_value(&db_values, 3, 9); + { + let mut cache_container = cache_container.write().unwrap(); + cache_container.add_snapshot(db_values.into()).unwrap(); + cache_container.commit_snapshot(&0).unwrap(); + assert!(cache_container.is_empty()); + } + + let cache_db_1 = CacheDb::new(1, cache_container.clone().into()); + put_value(&cache_db_1, 1, 8); + put_value(&cache_db_1, 4, 2); + put_value(&cache_db_1, 5, 7); + put_value(&cache_db_1, 8, 3); + { + let mut cache_container = cache_container.write().unwrap(); + cache_container.add_snapshot(cache_db_1.into()).unwrap(); + } + let cache_db_2 = CacheDb::new(2, cache_container.clone().into()); + put_value(&cache_db_2, 2, 6); + cache_db_2 + .delete::(&TestCompositeField(4, 0, 0)) + .unwrap(); + put_value(&cache_db_2, 9, 4); + put_value(&cache_db_2, 10, 2); + { + let mut cache_container = cache_container.write().unwrap(); + cache_container.add_snapshot(cache_db_2.into()).unwrap(); + } + + let cache_db_3 = CacheDb::new(3, cache_container.into()); + put_value(&cache_db_3, 1, 2); + put_value(&cache_db_3, 8, 6); + cache_db_3 + .delete::(&TestCompositeField(9, 0, 0)) + .unwrap(); + put_value(&cache_db_3, 12, 1); + + // Checking + check_value(&cache_db_3, 5, Some(7)); + check_value(&cache_db_3, 4, None); + + let actual_kv_sorted = cache_db_3 + .collect_in_range::( + TestCompositeField(0, 0, 0)..TestCompositeField(100, 0, 0), + ) + .unwrap() + .into_iter() + .map(|(k, v)| (k.0, v.0)) + .collect::>(); + + let expected_kv_sorted = vec![(1, 2), (2, 6), (3, 9), (5, 7), (8, 6), (10, 2), (12, 1)]; + + assert_eq!(expected_kv_sorted, actual_kv_sorted); + } + + // TODO: Proptest here +} diff --git a/src/cache/change_set.rs b/src/cache/change_set.rs new file mode 100644 index 0000000..5abd6b8 --- /dev/null +++ b/src/cache/change_set.rs @@ -0,0 +1,56 @@ +//! Collection of writes in given snapshot/cache layer +//! Data in ChangeSet is written inside crate, public access allows only reading. +use std::collections::btree_map; + +use crate::cache::SnapshotId; +use crate::{KeyCodec, Operation, Schema, SchemaBatch, SchemaKey}; + +/// Iterator type returned by [`ChangeSet::iter`]. +pub type ChangeSetIter<'a> = btree_map::Iter<'a, SchemaKey, Operation>; +/// Range type returned by [`ChangeSet::iter_range`]. +pub type ChangeSetRange<'a> = btree_map::Range<'a, SchemaKey, Operation>; + +/// Collection of all writes with associated [`SnapshotId`] +#[derive(Debug, Clone)] +pub struct ChangeSet { + id: SnapshotId, + pub(crate) operations: SchemaBatch, +} + +impl ChangeSet { + pub(crate) fn new(id: SnapshotId) -> Self { + Self { + id, + operations: SchemaBatch::default(), + } + } + + /// Get value from its own cache + pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { + self.operations.get(key) + } + + /// Get the ID of this [`ChangeSet`]. + pub fn id(&self) -> SnapshotId { + self.id + } + + /// Iterate over all operations in snapshot in lexicographic order + pub fn iter(&self) -> ChangeSetIter { + self.operations.iter::() + } + + /// Iterate over operations in lower_bound..upper_bound range in lexicographic order + pub fn iter_range( + &self, + range: impl std::ops::RangeBounds, + ) -> ChangeSetRange { + self.operations.iter_range::(range) + } +} + +impl From for SchemaBatch { + fn from(value: ChangeSet) -> Self { + value.operations + } +} diff --git a/src/cache/mod.rs b/src/cache/mod.rs new file mode 100644 index 0000000..635ad1c --- /dev/null +++ b/src/cache/mod.rs @@ -0,0 +1,8 @@ +//! All structs related to caching layer of Rockbound. + +pub mod cache_container; +pub mod cache_db; +pub mod change_set; + +/// Id of ChangeSet/snapshot/cache layer +pub type SnapshotId = u64; diff --git a/src/iterator.rs b/src/iterator.rs index 0a3d7ae..cd9fdc6 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -2,8 +2,9 @@ use std::iter::FusedIterator; use std::marker::PhantomData; use anyhow::Result; +use rocksdb::{ColumnFamily, ReadOptions}; -use crate::metrics::{ROCKBOUND_ITER_BYTES, ROCKBOUND_ITER_LATENCY_SECONDS}; +use crate::metrics::{SCHEMADB_ITER_BYTES, SCHEMADB_ITER_LATENCY_SECONDS}; use crate::schema::{KeyDecoder, Schema, ValueCodec}; use crate::{SchemaKey, SchemaValue}; @@ -29,6 +30,8 @@ pub trait SeekKeyEncoder: Sized { fn encode_seek_key(&self) -> crate::schema::Result>; } +/// Indicates in which direction iterator should be scanned. +#[derive(Debug, Clone, PartialEq)] pub(crate) enum ScanDirection { Forward, Backward, @@ -96,7 +99,7 @@ where } fn next_impl(&mut self) -> Result>> { - let _timer = ROCKBOUND_ITER_LATENCY_SECONDS + let _timer = SCHEMADB_ITER_LATENCY_SECONDS .with_label_values(&[S::COLUMN_FAMILY_NAME]) .start_timer(); @@ -108,7 +111,7 @@ where let raw_key = self.db_iter.key().expect("db_iter.key() failed."); let raw_value = self.db_iter.value().expect("db_iter.value() failed."); let value_size_bytes = raw_value.len(); - ROCKBOUND_ITER_BYTES + SCHEMADB_ITER_BYTES .with_label_values(&[S::COLUMN_FAMILY_NAME]) .observe((raw_key.len() + raw_value.len()) as f64); @@ -154,25 +157,76 @@ where impl<'a, S> FusedIterator for SchemaIterator<'a, S> where S: Schema {} -/// Iterates over given column backwards -pub struct RawDbReverseIterator<'a> { +/// Iterates over given column in [`rocksdb::DB`]. +pub(crate) struct RawDbIter<'a> { db_iter: rocksdb::DBRawIterator<'a>, + direction: ScanDirection, + upper_bound: std::ops::Bound, } -impl<'a> RawDbReverseIterator<'a> { - pub(crate) fn new(mut db_iter: rocksdb::DBRawIterator<'a>) -> Self { - db_iter.seek_to_last(); - RawDbReverseIterator { db_iter } - } +impl<'a> RawDbIter<'a> { + pub(crate) fn new( + inner: &'a rocksdb::DB, + cf_handle: &ColumnFamily, + range: impl std::ops::RangeBounds, + direction: ScanDirection, + ) -> Self { + if let std::ops::Bound::Excluded(_) = range.start_bound() { + panic!("Excluded start_bound is not supported"); + } - /// Navigate iterator go given key - pub fn seek(&mut self, seek_key: SchemaKey) -> Result<()> { - self.db_iter.seek_for_prev(&seek_key); - Ok(()) + // | | ScanDirection::Forward | ScanDirection::Backward | + // |------------------|-----------------------------------------------|----------------------------------------------| + // | | iterator init | bounds check | iterator init | bounds check | + // |------------------|-----------------------|-----------------------|-----------------------|----------------------| + // | start::Excluded | panic | | panic | | + // | start::Included | db_iter.seek_to_first | opts.set_lower_bound | | opts.set_lower_bound | + // | start::Unbounded | db_iter.seek_to_first | N/A | | | + // | end::Excluded | | opts.set_upper_bound | db_iter.seek_to_last | opts.set_upper_bound | + // | end::Included | | check inside `next()` | db_iter.seek_for_prev | via db_iter.seek* | + // | end::Unbounded | | N/A | db_iter.seek_to_last | via db_iter.seek* | + + // Configure options + let mut opts: ReadOptions = Default::default(); + // It is safe to always start_bound, because `Bound::Excluded` was excluded before + if let std::ops::Bound::Included(lower) = range.start_bound() { + opts.set_iterate_lower_bound(lower.clone()); + } + // end_bound explicitly set only if it's excluded, to match rocksdb API + if let std::ops::Bound::Excluded(upper) = range.end_bound() { + opts.set_iterate_upper_bound(upper.clone()); + } + + // + // Configure iterator + let mut db_iter = inner.raw_iterator_cf_opt(cf_handle, opts); + + // Now need to navigate + match direction { + ScanDirection::Forward => { + // Always seek to first for the forward iterator. + db_iter.seek_to_first(); + } + ScanDirection::Backward => match range.end_bound() { + // Seek to last only if it matches rocksdb API + std::ops::Bound::Excluded(_) | std::ops::Bound::Unbounded => { + db_iter.seek_to_last(); + } + // In case of Included upper bound, we have to seek to it manually, to move backwards from there + std::ops::Bound::Included(upper) => { + db_iter.seek_for_prev(upper); + } + }, + }; + RawDbIter { + db_iter, + direction, + upper_bound: range.end_bound().cloned(), + } } } -impl<'a> Iterator for RawDbReverseIterator<'a> { +impl<'a> Iterator for RawDbIter<'a> { type Item = (SchemaKey, SchemaValue); fn next(&mut self) -> Option { @@ -185,8 +239,360 @@ impl<'a> Iterator for RawDbReverseIterator<'a> { // Have to allocate to fix lifetime issue let next_item = (next_item.0.to_vec(), next_item.1.to_vec()); - self.db_iter.prev(); + // If next item is larger than upper bound, we're done + if let std::ops::Bound::Included(upper) = self.upper_bound.as_ref() { + if &next_item.0 > upper { + // That we're moving forward!!! + assert_eq!( + &ScanDirection::Forward, + &self.direction, + "Upper bound exceeded, while moving backward: {:?} {:?} ", + next_item, + upper + ); + return None; + } + } + + match self.direction { + ScanDirection::Forward => self.db_iter.next(), + ScanDirection::Backward => self.db_iter.prev(), + }; Some(next_item) } } + +#[cfg(test)] +mod tests { + use std::path::Path; + + use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; + + use super::*; + use crate::schema::ColumnFamilyName; + use crate::test::TestField; + use crate::{define_schema, DB}; + + define_schema!(TestSchema1, TestField, TestField, "TestCF1"); + + type S = TestSchema1; + + fn get_column_families() -> Vec { + vec![DEFAULT_COLUMN_FAMILY_NAME, S::COLUMN_FAMILY_NAME] + } + + fn open_db(dir: impl AsRef) -> DB { + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + DB::open(dir, "test-iterator", get_column_families(), &db_opts).expect("Failed to open DB.") + } + + mod raw_db_iterator { + use proptest::prelude::*; + + use super::*; + use crate::schema::KeyEncoder; + + #[test] + fn test_empty_raw_iterator() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(tmpdir.path()); + + let iter_forward = db.raw_iter::(ScanDirection::Forward).unwrap(); + let count_forward = iter_forward.count(); + assert_eq!(0, count_forward); + + let iter_backward = db.raw_iter::(ScanDirection::Backward).unwrap(); + let count_backward = iter_backward.count(); + assert_eq!(0, count_backward); + } + + fn collect_actual_values(iter: RawDbIter) -> Vec<(u32, u32)> { + iter.map(|(key, value)| { + let key = <::Key as KeyDecoder>::decode_key(&key).unwrap(); + let value = <::Value as ValueCodec>::decode_value(&value).unwrap(); + (key.0, value.0) + }) + .collect::>() + } + + fn encode_key(field: &TestField) -> SchemaKey { + >::encode_key(field).unwrap() + } + + fn test_iterator( + db: &DB, + prefix: &'static str, + range: impl std::ops::RangeBounds + Clone, + mut expected_values: Vec<(u32, u32)>, + ) { + let iter_range_forward = db + .raw_iter_range::(range.clone(), ScanDirection::Forward) + .unwrap(); + let actual_values = collect_actual_values(iter_range_forward); + + assert_eq!(expected_values, actual_values, "{} forward", prefix,); + let iter_range_backward = db + .raw_iter_range::(range, ScanDirection::Backward) + .unwrap(); + let actual_values = collect_actual_values(iter_range_backward); + expected_values.reverse(); + assert_eq!(expected_values, actual_values, "{} backward", prefix); + } + + #[test] + fn raw_db_iterator_iterate() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(tmpdir.path()); + + let field_1 = TestField(10); + let field_2 = TestField(20); + let field_3 = TestField(30); + let field_4 = TestField(40); + let field_5 = TestField(50); + let field_6 = TestField(60); + + db.put::(&field_3, &field_2).unwrap(); + db.put::(&field_2, &field_1).unwrap(); + db.put::(&field_4, &field_5).unwrap(); + db.put::(&field_1, &field_4).unwrap(); + db.put::(&field_5, &field_6).unwrap(); + db.put::(&field_6, &field_6).unwrap(); + + // All values, forward + let iter_forward = db.raw_iter::(ScanDirection::Forward).unwrap(); + + let actual_values = collect_actual_values(iter_forward); + let expected_values = vec![(10, 40), (20, 10), (30, 20), (40, 50), (50, 60), (60, 60)]; + assert_eq!(expected_values, actual_values, "all values, forward"); + + // All values, backward + let iter_backward = db.raw_iter::(ScanDirection::Backward).unwrap(); + let actual_values = collect_actual_values(iter_backward); + let expected_values = vec![(60, 60), (50, 60), (40, 50), (30, 20), (20, 10), (10, 40)]; + + assert_eq!(expected_values, actual_values, "all values, backward"); + + // Bounds: Ranges + let lower_bound = encode_key(&field_2); + let upper_bound = encode_key(&field_5); + + test_iterator( + &db, + "20..50", + lower_bound.clone()..upper_bound.clone(), + vec![(20, 10), (30, 20), (40, 50)], + ); + test_iterator( + &db, + "20..=50", + lower_bound.clone()..=upper_bound.clone(), + vec![(20, 10), (30, 20), (40, 50), (50, 60)], + ); + test_iterator( + &db, + "15..45", + encode_key(&TestField(15))..encode_key(&TestField(45)), + vec![(20, 10), (30, 20), (40, 50)], + ); + test_iterator( + &db, + "15..=45", + encode_key(&TestField(15))..=encode_key(&TestField(45)), + vec![(20, 10), (30, 20), (40, 50)], + ); + test_iterator( + &db, + "20..", + lower_bound.clone().., + vec![(20, 10), (30, 20), (40, 50), (50, 60), (60, 60)], + ); + test_iterator( + &db, + "..50", + ..upper_bound.clone(), + vec![(10, 40), (20, 10), (30, 20), (40, 50)], + ); + test_iterator( + &db, + "..=50", + ..=upper_bound.clone(), + vec![(10, 40), (20, 10), (30, 20), (40, 50), (50, 60)], + ); + test_iterator( + &db, + "..=59", + ..encode_key(&TestField(59)), + vec![(10, 40), (20, 10), (30, 20), (40, 50), (50, 60)], + ); + test_iterator( + &db, + "..", + .., + vec![(10, 40), (20, 10), (30, 20), (40, 50), (50, 60), (60, 60)], + ); + test_iterator( + &db, + "50..=50", + upper_bound.clone()..=upper_bound.clone(), + vec![(50, 60)], + ); + test_iterator( + &db, + "outside upper 0..100", + encode_key(&TestField(100))..encode_key(&TestField(102)), + vec![], + ); + + test_iterator( + &db, + "outside lower 0..10", + encode_key(&TestField(0))..encode_key(&TestField(1)), + vec![], + ); + + { + for direction in [ScanDirection::Forward, ScanDirection::Backward] { + // Inverse + let err = db + .raw_iter_range::( + upper_bound.clone()..lower_bound.clone(), + direction.clone(), + ) + .err() + .unwrap(); + assert_eq!("lower_bound > upper_bound", err.to_string()); + + // Empty + let iter = db + .raw_iter_range::(upper_bound.clone()..upper_bound.clone(), direction) + .unwrap(); + let actual_values = collect_actual_values(iter); + assert_eq!(Vec::<(u32, u32)>::new(), actual_values); + } + } + } + + fn check_iterator_proptest(numbers: Vec) { + assert!(numbers.len() >= 10); + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(tmpdir.path()); + + // Numbers + let existing_lower = numbers[2]; + let existing_upper = numbers[8]; + + let min_value = *numbers.iter().min().unwrap(); + let max_value = *numbers.iter().max().unwrap(); + let maybe_non_existing_lower_1 = existing_lower.saturating_sub(1); + let maybe_non_existing_lower_2 = existing_lower.checked_add(1).unwrap_or(u32::MAX - 2); + let maybe_non_existing_upper_1 = existing_upper.saturating_sub(1); + let maybe_non_existing_upper_2 = existing_upper.checked_add(1).unwrap_or(u32::MAX - 2); + + // Ranges will be constructed from these pairs + let range_pairs = vec![ + (min_value, max_value), + (existing_lower, existing_upper), + (min_value, existing_lower), + (min_value, existing_upper), + (existing_lower, max_value), + (existing_upper, max_value), + (existing_lower, maybe_non_existing_upper_1), + (existing_lower, maybe_non_existing_upper_2), + (maybe_non_existing_lower_1, existing_upper), + (maybe_non_existing_lower_2, existing_upper), + (maybe_non_existing_lower_1, maybe_non_existing_upper_1), + (maybe_non_existing_lower_1, maybe_non_existing_upper_2), + (maybe_non_existing_lower_2, maybe_non_existing_upper_1), + (maybe_non_existing_lower_1, maybe_non_existing_upper_2), + (maybe_non_existing_lower_1, max_value), + (maybe_non_existing_lower_2, max_value), + (min_value, maybe_non_existing_upper_1), + (min_value, maybe_non_existing_upper_2), + ]; + + for number in numbers { + db.put::(&TestField(number), &TestField(number)).unwrap(); + } + for (low, high) in range_pairs { + if low > high { + continue; + } + let low = TestField(low); + let high = TestField(high); + + let range = encode_key(&low)..encode_key(&high); + let range_inclusive = encode_key(&low)..=encode_key(&high); + let range_to = ..encode_key(&high); + let range_to_inclusive = ..=encode_key(&high); + let range_from = encode_key(&low)..; + let range_full = ..; + + check_range(&db, range); + check_range(&db, range_inclusive); + check_range(&db, range_to); + check_range(&db, range_to_inclusive); + check_range(&db, range_from); + check_range(&db, range_full); + } + } + + fn check_range + Clone + std::fmt::Debug>( + db: &DB, + range: R, + ) { + for direction in [ScanDirection::Forward, ScanDirection::Backward] { + let iter_range = db + .raw_iter_range::(range.clone(), direction.clone()) + .unwrap(); + validate_iterator(iter_range, range.clone(), direction.clone()); + } + } + + fn validate_iterator(iterator: I, range: R, direction: ScanDirection) + where + I: Iterator, + R: std::ops::RangeBounds + std::fmt::Debug, + { + let mut prev_key: Option = None; + for (key, _) in iterator { + assert!(range.contains(&key)); + let key = <::Key as KeyDecoder>::decode_key(&key).unwrap(); + if let Some(prev_key) = prev_key { + match direction { + ScanDirection::Forward => assert!( + key.0 >= prev_key.0, + "key={}, prev_key={} range={:?}", + key.0, + prev_key.0, + range + ), + ScanDirection::Backward => assert!( + key.0 <= prev_key.0, + "key={}, prev_key={} range={:?}", + key.0, + prev_key.0, + range + ), + }; + } + prev_key = Some(key); + } + } + + proptest! { + #[test] + fn raw_db_iterator_iterate_proptest_any_number(input in prop::collection::vec(any::(), 10..80)) { + check_iterator_proptest(input); + } + + #[test] + fn raw_db_iterator_iterate_proptest_uniq_numbers(input in prop::collection::hash_set(any::(), 10..80)) { + let input: Vec = input.into_iter().collect(); + check_iterator_proptest(input); + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index a89aaaf..df3f8cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // Adapted from aptos-core/schemadb -#![forbid(unsafe_code)] -#![deny(missing_docs)] - //! This library implements a schematized DB on top of [RocksDB](https://rocksdb.org/). It makes //! sure all data passed in and out are structured according to predefined schemas and prevents //! access to raw keys and values. This library also enforces a set of specific DB options, @@ -13,23 +10,28 @@ //! families. To use this library to store a kind of key-value pairs, the user needs to use the //! [`define_schema!`] macro to define the schema name, the types of key and value, and name of the //! column family. +#![deny(missing_docs)] +#![forbid(unsafe_code)] + +pub mod cache; mod iterator; mod metrics; pub mod schema; mod schema_batch; -pub mod snapshot; + #[cfg(feature = "test-utils")] pub mod test; use std::path::Path; +use std::sync::{Arc, LockResult, RwLock, RwLockReadGuard}; use anyhow::format_err; use iterator::ScanDirection; -pub use iterator::{RawDbReverseIterator, SchemaIterator, SeekKeyEncoder}; +pub use iterator::{SchemaIterator, SeekKeyEncoder}; use metrics::{ - ROCKBOUND_BATCH_COMMIT_BYTES, ROCKBOUND_BATCH_COMMIT_LATENCY_SECONDS, ROCKBOUND_DELETES, - ROCKBOUND_GET_BYTES, ROCKBOUND_GET_LATENCY_SECONDS, ROCKBOUND_PUT_BYTES, + SCHEMADB_BATCH_COMMIT_BYTES, SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS, SCHEMADB_DELETES, + SCHEMADB_GET_BYTES, SCHEMADB_GET_LATENCY_SECONDS, SCHEMADB_PUT_BYTES, }; pub use rocksdb; use rocksdb::ReadOptions; @@ -37,9 +39,10 @@ pub use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; use thiserror::Error; use tracing::info; +use crate::iterator::RawDbIter; pub use crate::schema::Schema; use crate::schema::{ColumnFamilyName, KeyCodec, ValueCodec}; -pub use crate::schema_batch::{SchemaBatch, SchemaBatchIterator}; +pub use crate::schema_batch::SchemaBatch; /// This DB is a schematized RocksDB wrapper where all data passed in and out are typed according to /// [`Schema`]s. @@ -112,7 +115,7 @@ impl DB { } fn log_construct(name: &'static str, inner: rocksdb::DB) -> DB { - info!(rocksdb_name = name, "Opened RocksDB."); + info!(rocksdb_name = name, "Opened RocksDB"); DB { name, inner } } @@ -121,7 +124,7 @@ impl DB { &self, schema_key: &impl KeyCodec, ) -> anyhow::Result> { - let _timer = ROCKBOUND_GET_LATENCY_SECONDS + let _timer = SCHEMADB_GET_LATENCY_SECONDS .with_label_values(&[S::COLUMN_FAMILY_NAME]) .start_timer(); @@ -129,7 +132,7 @@ impl DB { let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?; let result = self.inner.get_pinned_cf(cf_handle, k)?; - ROCKBOUND_GET_BYTES + SCHEMADB_GET_BYTES .with_label_values(&[S::COLUMN_FAMILY_NAME]) .observe(result.as_ref().map_or(0.0, |v| v.len() as f64)); @@ -195,13 +198,26 @@ impl DB { self.iter_with_direction::(Default::default(), ScanDirection::Forward) } - /// Returns a [`RawDbReverseIterator`] which allows to iterate over raw values, backwards - pub fn raw_iter(&self) -> anyhow::Result { + /// Returns a [`RawDbIter`] which allows to iterate over raw values in specified [`ScanDirection`]. + pub(crate) fn raw_iter( + &self, + direction: ScanDirection, + ) -> anyhow::Result { let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?; - Ok(RawDbReverseIterator::new( - self.inner - .raw_iterator_cf_opt(cf_handle, Default::default()), - )) + Ok(RawDbIter::new(&self.inner, cf_handle, .., direction)) + } + + /// Get a [`RawDbIter`] in given range and direction. + pub(crate) fn raw_iter_range( + &self, + range: impl std::ops::RangeBounds, + direction: ScanDirection, + ) -> anyhow::Result { + if is_range_bounds_inverse(&range) { + anyhow::bail!("lower_bound > upper_bound"); + } + let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?; + Ok(RawDbIter::new(&self.inner, cf_handle, range, direction)) } /// Returns a forward [`SchemaIterator`] on a certain schema with the provided read options. @@ -214,7 +230,7 @@ impl DB { /// Writes a group of records wrapped in a [`SchemaBatch`]. pub fn write_schemas(&self, batch: SchemaBatch) -> anyhow::Result<()> { - let _timer = ROCKBOUND_BATCH_COMMIT_LATENCY_SECONDS + let _timer = SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS .with_label_values(&[self.name]) .start_timer(); let mut db_batch = rocksdb::WriteBatch::default(); @@ -236,17 +252,17 @@ impl DB { for (key, operation) in rows { match operation { Operation::Put { value } => { - ROCKBOUND_PUT_BYTES + SCHEMADB_PUT_BYTES .with_label_values(&[cf_name]) .observe((key.len() + value.len()) as f64); } Operation::Delete => { - ROCKBOUND_DELETES.with_label_values(&[cf_name]).inc(); + SCHEMADB_DELETES.with_label_values(&[cf_name]).inc(); } } } } - ROCKBOUND_BATCH_COMMIT_BYTES + SCHEMADB_BATCH_COMMIT_BYTES .with_label_values(&[self.name]) .observe(serialized_size as f64); @@ -294,9 +310,9 @@ pub type SchemaKey = Vec; /// Readability alias for a value in the DB. pub type SchemaValue = Vec; +/// Represents operation written to the database. #[cfg_attr(feature = "arbitrary", derive(proptest_derive::Arbitrary))] #[derive(Debug, PartialEq, Eq, Hash, Clone)] -/// Represents operation written to the database pub enum Operation { /// Writing a value to the DB. Put { @@ -307,6 +323,30 @@ pub enum Operation { Delete, } +impl Operation { + /// Returns [`S::Value`] if the operation is [`Operation::Put`] and `None` if [`Operation::Delete`]. + fn decode_value(&self) -> anyhow::Result> { + match self { + Operation::Put { value } => { + let value = S::Value::decode_value(value)?; + Ok(Some(value)) + } + Operation::Delete => Ok(None), + } + } +} + +fn is_range_bounds_inverse(range: &impl std::ops::RangeBounds) -> bool { + match (range.start_bound(), range.end_bound()) { + (std::ops::Bound::Included(start), std::ops::Bound::Included(end)) => start > end, + (std::ops::Bound::Included(start), std::ops::Bound::Excluded(end)) => start > end, + (std::ops::Bound::Excluded(start), std::ops::Bound::Included(end)) => start > end, + (std::ops::Bound::Excluded(start), std::ops::Bound::Excluded(end)) => start > end, + (std::ops::Bound::Unbounded, _) => false, + (_, std::ops::Bound::Unbounded) => false, + } +} + /// An error that occurred during (de)serialization of a [`Schema`]'s keys or /// values. #[derive(Error, Debug)] @@ -334,6 +374,33 @@ fn default_write_options() -> rocksdb::WriteOptions { opts } +/// Wrapper around `RwLock` that only allows read access. +/// This type implies that wrapped type suppose to be used only for reading. +/// It is useful to indicate that user of this type can only do reading. +/// This also implies that that inner `Arc>` is a clone and some other part can do writing. +#[derive(Debug, Clone)] +pub struct ReadOnlyLock { + lock: Arc>, +} + +impl ReadOnlyLock { + /// Create new [`ReadOnlyLock`] from [`Arc>`]. + pub fn new(lock: Arc>) -> Self { + Self { lock } + } + + /// Acquires a read lock on the underlying [`RwLock`]. + pub fn read(&self) -> LockResult> { + self.lock.read() + } +} + +impl From>> for ReadOnlyLock { + fn from(value: Arc>) -> Self { + Self::new(value) + } +} + #[cfg(test)] mod tests { use super::*; @@ -354,4 +421,14 @@ mod tests { assert!(db_debug.contains("test_db_debug")); assert!(db_debug.contains(tmpdir.path().to_str().unwrap())); } + + #[test] + fn test_range_inverse() { + assert!(is_range_bounds_inverse(&(vec![4]..vec![3]))); + assert!(is_range_bounds_inverse(&(vec![4]..=vec![3]))); + // Not inverse, but empty + assert!(!is_range_bounds_inverse(&(vec![3]..vec![3]))); + // Not inverse + assert!(!is_range_bounds_inverse(&(vec![3]..=vec![3]))); + } } diff --git a/src/metrics.rs b/src/metrics.rs index a61de31..9bb0251 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -7,10 +7,10 @@ use prometheus::{ IntCounterVec, }; -pub static ROCKBOUND_ITER_LATENCY_SECONDS: Lazy = Lazy::new(|| { +pub static SCHEMADB_ITER_LATENCY_SECONDS: Lazy = Lazy::new(|| { register_histogram_vec!( // metric name - "rockbound_iter_latency_seconds", + "schemadb_iter_latency_seconds", // metric description "Schemadb iter latency in seconds", // metric labels (dimensions) @@ -20,10 +20,10 @@ pub static ROCKBOUND_ITER_LATENCY_SECONDS: Lazy = Lazy::new(|| { .unwrap() }); -pub static ROCKBOUND_ITER_BYTES: Lazy = Lazy::new(|| { +pub static SCHEMADB_ITER_BYTES: Lazy = Lazy::new(|| { register_histogram_vec!( // metric name - "rockbound_iter_bytes", + "schemadb_iter_bytes", // metric description "Schemadb iter size in bytes", // metric labels (dimensions) @@ -32,10 +32,10 @@ pub static ROCKBOUND_ITER_BYTES: Lazy = Lazy::new(|| { .unwrap() }); -pub static ROCKBOUND_GET_LATENCY_SECONDS: Lazy = Lazy::new(|| { +pub static SCHEMADB_GET_LATENCY_SECONDS: Lazy = Lazy::new(|| { register_histogram_vec!( // metric name - "rockbound_get_latency_seconds", + "schemadb_get_latency_seconds", // metric description "Schemadb get latency in seconds", // metric labels (dimensions) @@ -45,10 +45,10 @@ pub static ROCKBOUND_GET_LATENCY_SECONDS: Lazy = Lazy::new(|| { .unwrap() }); -pub static ROCKBOUND_GET_BYTES: Lazy = Lazy::new(|| { +pub static SCHEMADB_GET_BYTES: Lazy = Lazy::new(|| { register_histogram_vec!( // metric name - "rockbound_get_bytes", + "schemadb_get_bytes", // metric description "Schemadb get call returned data size in bytes", // metric labels (dimensions) @@ -57,10 +57,10 @@ pub static ROCKBOUND_GET_BYTES: Lazy = Lazy::new(|| { .unwrap() }); -pub static ROCKBOUND_BATCH_COMMIT_LATENCY_SECONDS: Lazy = Lazy::new(|| { +pub static SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS: Lazy = Lazy::new(|| { register_histogram_vec!( // metric name - "rockbound_batch_commit_latency_seconds", + "schemadb_batch_commit_latency_seconds", // metric description "Schemadb schema batch commit latency in seconds", // metric labels (dimensions) @@ -70,10 +70,10 @@ pub static ROCKBOUND_BATCH_COMMIT_LATENCY_SECONDS: Lazy = Lazy::ne .unwrap() }); -pub static ROCKBOUND_BATCH_COMMIT_BYTES: Lazy = Lazy::new(|| { +pub static SCHEMADB_BATCH_COMMIT_BYTES: Lazy = Lazy::new(|| { register_histogram_vec!( // metric name - "rockbound_batch_commit_bytes", + "schemadb_batch_commit_bytes", // metric description "Schemadb schema batch commit size in bytes", // metric labels (dimensions) @@ -82,28 +82,28 @@ pub static ROCKBOUND_BATCH_COMMIT_BYTES: Lazy = Lazy::new(|| { .unwrap() }); -pub static ROCKBOUND_PUT_BYTES: Lazy = Lazy::new(|| { +pub static SCHEMADB_PUT_BYTES: Lazy = Lazy::new(|| { register_histogram_vec!( // metric name - "sov_schema_db_put_bytes", + "rockbound_put_bytes", // metric description - "sov_schema_db put call puts data size in bytes", + "rockbound put call puts data size in bytes", // metric labels (dimensions) &["cf_name"] ) .unwrap() }); -pub static ROCKBOUND_DELETES: Lazy = Lazy::new(|| { +pub static SCHEMADB_DELETES: Lazy = Lazy::new(|| { register_int_counter_vec!("storage_deletes", "Storage delete calls", &["cf_name"]).unwrap() }); -pub static ROCKBOUND_BATCH_PUT_LATENCY_SECONDS: Lazy = Lazy::new(|| { +pub static SCHEMADB_BATCH_PUT_LATENCY_SECONDS: Lazy = Lazy::new(|| { register_histogram_vec!( // metric name - "sov_schema_db_batch_put_latency_seconds", + "rockbound_batch_put_latency_seconds", // metric description - "sov_schema_db schema batch put latency in seconds", + "rockbound schema batch put latency in seconds", // metric labels (dimensions) &["db_name"], exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(), diff --git a/src/schema_batch.rs b/src/schema_batch.rs index d4e82d0..3e79108 100644 --- a/src/schema_batch.rs +++ b/src/schema_batch.rs @@ -1,16 +1,14 @@ use std::collections::{btree_map, BTreeMap, HashMap}; -use std::iter::Rev; -use crate::metrics::ROCKBOUND_BATCH_PUT_LATENCY_SECONDS; +use crate::metrics::SCHEMADB_BATCH_PUT_LATENCY_SECONDS; use crate::schema::{ColumnFamilyName, KeyCodec, ValueCodec}; use crate::{Operation, Schema, SchemaKey}; // [`SchemaBatch`] holds a collection of updates that can be applied to a DB /// ([`Schema`]) atomically. The updates will be applied in the order in which /// they are added to the [`SchemaBatch`]. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct SchemaBatch { - // Temporary pub(crate), before iterator is done pub(crate) last_writes: HashMap>, } @@ -26,14 +24,16 @@ impl SchemaBatch { key: &impl KeyCodec, value: &impl ValueCodec, ) -> anyhow::Result<()> { - let _timer = ROCKBOUND_BATCH_PUT_LATENCY_SECONDS + let _timer = SCHEMADB_BATCH_PUT_LATENCY_SECONDS .with_label_values(&["unknown"]) .start_timer(); + let key = key.encode_key()?; let put_operation = Operation::Put { value: value.encode_value()?, }; self.insert_operation::(key, put_operation); + Ok(()) } @@ -50,74 +50,46 @@ impl SchemaBatch { column_writes.insert(key, operation); } - pub(crate) fn read( + pub(crate) fn get( &self, key: &impl KeyCodec, ) -> anyhow::Result> { let key = key.encode_key()?; + if let Some(column_writes) = self.last_writes.get(&S::COLUMN_FAMILY_NAME) { - return Ok(column_writes.get(&key)); + Ok(column_writes.get(&key)) + } else { + Ok(None) } - Ok(None) } - /// Iterate over all the writes in the batch for a given column family in reversed lexicographic order - /// Returns None column family name does not have any writes - pub fn iter( - &self, - ) -> SchemaBatchIterator<'_, S, Rev>> { - let some_rows = self.last_writes.get(&S::COLUMN_FAMILY_NAME); - SchemaBatchIterator { - inner: some_rows.map(|rows| rows.iter().rev()), - _phantom_schema: std::marker::PhantomData, - } + /// Iterator over all values in lexicographic order. + pub fn iter(&self) -> btree_map::Iter { + self.last_writes + .get(&S::COLUMN_FAMILY_NAME) + .map(BTreeMap::iter) + .unwrap_or_default() } - /// Return iterator that iterates from operations with largest_key == upper_bound backwards + /// Iterator in given range in lexicographic order. pub fn iter_range( &self, - upper_bound: SchemaKey, - ) -> SchemaBatchIterator<'_, S, Rev>> { - let some_rows = self.last_writes.get(&S::COLUMN_FAMILY_NAME); - SchemaBatchIterator { - inner: some_rows.map(|rows| rows.range(..=upper_bound).rev()), - _phantom_schema: std::marker::PhantomData, - } + range: impl std::ops::RangeBounds, + ) -> btree_map::Range { + self.last_writes + .get(&S::COLUMN_FAMILY_NAME) + .map(|column_writes| column_writes.range(range)) + .unwrap_or_default() } pub(crate) fn merge(&mut self, other: SchemaBatch) { for (cf_name, other_cf_map) in other.last_writes { - let self_cf_map = self.last_writes.entry(cf_name).or_default(); - - for (key, operation) in other_cf_map { - self_cf_map.insert(key, operation); - } + let cf_map = self.last_writes.entry(cf_name).or_default(); + cf_map.extend(other_cf_map); } } } -/// Iterator over [`SchemaBatch`] for a given column family in reversed lexicographic order -pub struct SchemaBatchIterator<'a, S, I> -where - S: Schema, - I: Iterator, -{ - inner: Option, - _phantom_schema: std::marker::PhantomData, -} - -impl<'a, S, I> Iterator for SchemaBatchIterator<'a, S, I> -where - S: Schema, - I: Iterator, -{ - type Item = I::Item; - - fn next(&mut self) -> Option { - self.inner.as_mut().and_then(|inner| inner.next()) - } -} - #[cfg(feature = "arbitrary")] impl proptest::arbitrary::Arbitrary for SchemaBatch { type Parameters = &'static [ColumnFamilyName]; @@ -138,3 +110,185 @@ impl proptest::arbitrary::Arbitrary for SchemaBatch { type Strategy = proptest::strategy::BoxedStrategy; } + +#[cfg(test)] +mod tests { + use super::*; + use crate::define_schema; + use crate::schema::{KeyEncoder, ValueCodec}; + use crate::test::TestField; + + define_schema!(TestSchema1, TestField, TestField, "TestCF1"); + + mod iter { + use super::*; + use crate::schema::KeyDecoder; + + #[test] + fn empty_schema_batch_iterator() { + let batch = SchemaBatch::new(); + let mut iter_forward = batch.iter::(); + assert_eq!(None, iter_forward.next()); + let mut iter_backward = batch.iter::().rev(); + assert_eq!(None, iter_backward.next()); + } + + fn collect_actual_values<'a, I: Iterator>( + iter: I, + ) -> Vec<(u32, Option)> { + iter.map(|(key, operation)| { + let key = + <::Key as KeyDecoder>::decode_key(key) + .unwrap() + .0; + let value = match operation { + Operation::Put { value } => Some( + <::Value as ValueCodec>::decode_value( + value, + ) + .unwrap() + .0, + ), + Operation::Delete => None, + }; + (key, value) + }) + .collect() + } + + #[test] + fn iterator() { + let mut batch = SchemaBatch::new(); + + let field_1 = TestField(1); + let field_2 = TestField(2); + let field_3 = TestField(3); + let field_4 = TestField(4); + + batch.put::(&field_2, &field_1).unwrap(); + batch.put::(&field_1, &field_3).unwrap(); + batch.delete::(&field_4).unwrap(); + batch.put::(&field_3, &field_4).unwrap(); + + let iter_forward = batch.iter::(); + let actual_values: Vec<(u32, Option)> = collect_actual_values(iter_forward); + let expected_values = vec![(1, Some(3)), (2, Some(1)), (3, Some(4)), (4, None)]; + assert_eq!(expected_values, actual_values); + + let iter_backward = batch.iter::().rev(); + let actual_values: Vec<(u32, Option)> = collect_actual_values(iter_backward); + let expected_values = vec![(4, None), (3, Some(4)), (2, Some(1)), (1, Some(3))]; + assert_eq!(expected_values, actual_values); + } + + fn encode_key(field: &TestField) -> SchemaKey { + >::encode_key(field).unwrap() + } + + #[test] + fn range_iterator() { + let mut batch = SchemaBatch::new(); + + let field_1 = TestField(1); + let field_2 = TestField(2); + let field_3 = TestField(3); + let field_4 = TestField(4); + let field_5 = TestField(5); + + batch.put::(&field_2, &field_1).unwrap(); + batch.put::(&field_1, &field_3).unwrap(); + batch.delete::(&field_4).unwrap(); + batch.put::(&field_5, &field_2).unwrap(); + batch.put::(&field_3, &field_4).unwrap(); + + // 2..4 + let iter_range = + batch.iter_range::(encode_key(&field_2)..encode_key(&field_4)); + let actual_values: Vec<(u32, Option)> = collect_actual_values(iter_range); + let mut expected_values = vec![(2, Some(1)), (3, Some(4))]; + assert_eq!(expected_values, actual_values, "2..4"); + let rev_iter_range = batch + .iter_range::(encode_key(&field_2)..encode_key(&field_4)) + .rev(); + let actual_values: Vec<(u32, Option)> = collect_actual_values(rev_iter_range); + expected_values.reverse(); + assert_eq!(expected_values, actual_values, "rev:2..4"); + + // 2.. + let iter_range = batch.iter_range::(encode_key(&field_2)..); + let actual_values: Vec<(u32, Option)> = collect_actual_values(iter_range); + let mut expected_values = vec![(2, Some(1)), (3, Some(4)), (4, None), (5, Some(2))]; + assert_eq!(expected_values, actual_values, "2.."); + let rev_iter_range = batch + .iter_range::(encode_key(&field_2)..) + .rev(); + let actual_values: Vec<(u32, Option)> = collect_actual_values(rev_iter_range); + expected_values.reverse(); + assert_eq!(expected_values, actual_values, "rev:2.."); + + // ..4 + let iter_range = batch.iter_range::(..encode_key(&field_4)); + let actual_values: Vec<(u32, Option)> = collect_actual_values(iter_range); + let mut expected_values = vec![(1, Some(3)), (2, Some(1)), (3, Some(4))]; + assert_eq!(expected_values, actual_values, "..4"); + let rev_iter_range = batch + .iter_range::(..encode_key(&field_4)) + .rev(); + let actual_values: Vec<(u32, Option)> = collect_actual_values(rev_iter_range); + expected_values.reverse(); + assert_eq!(expected_values, actual_values, "rev:..4"); + // .. + let iter_range = batch.iter_range::(..); + let actual_values: Vec<(u32, Option)> = collect_actual_values(iter_range); + let mut expected_values = vec![ + (1, Some(3)), + (2, Some(1)), + (3, Some(4)), + (4, None), + (5, Some(2)), + ]; + assert_eq!(expected_values, actual_values, ".."); + let rev_iter_range = batch.iter_range::(..).rev(); + let actual_values: Vec<(u32, Option)> = collect_actual_values(rev_iter_range); + expected_values.reverse(); + assert_eq!(expected_values, actual_values, "rev:.."); + } + + #[test] + #[should_panic(expected = "range start is greater than range end in BTreeMap")] + fn inverse_range_iterator() { + let mut batch = SchemaBatch::new(); + + let field_1 = TestField(1); + let field_2 = TestField(2); + let field_3 = TestField(3); + let field_4 = TestField(4); + + batch.put::(&field_2, &field_1).unwrap(); + batch.put::(&field_1, &field_3).unwrap(); + batch.delete::(&field_4).unwrap(); + batch + .iter_range::(encode_key(&field_4)..encode_key(&field_2)) + .for_each(drop); + } + + #[test] + #[should_panic(expected = "range start is greater than range end in BTreeMap")] + fn inverse_range_rev_iterator() { + let mut batch = SchemaBatch::new(); + + let field_1 = TestField(1); + let field_2 = TestField(2); + let field_3 = TestField(3); + let field_4 = TestField(4); + + batch.put::(&field_2, &field_1).unwrap(); + batch.put::(&field_1, &field_3).unwrap(); + batch.delete::(&field_4).unwrap(); + batch + .iter_range::(encode_key(&field_4)..encode_key(&field_2)) + .rev() + .for_each(drop); + } + } +} diff --git a/src/snapshot.rs b/src/snapshot.rs deleted file mode 100644 index ee4292c..0000000 --- a/src/snapshot.rs +++ /dev/null @@ -1,508 +0,0 @@ -//! Snapshot related logic - -use std::collections::btree_map; -use std::iter::Rev; -use std::sync::{Arc, LockResult, Mutex, RwLock, RwLockReadGuard}; - -use crate::schema::{KeyCodec, KeyDecoder, ValueCodec}; -use crate::schema_batch::SchemaBatchIterator; -use crate::{Operation, Schema, SchemaBatch, SchemaKey, SchemaValue, SeekKeyEncoder}; - -/// Id of database snapshot -pub type SnapshotId = u64; - -/// A trait to make nested calls to several [`SchemaBatch`]s and eventually [`crate::DB`] -pub trait QueryManager { - /// Iterator over key-value pairs in reverse lexicographic order in given [`Schema`] - type Iter<'a, S: Schema>: Iterator - where - Self: 'a; - /// Iterator with given range - type RangeIter<'a, S: Schema>: Iterator - where - Self: 'a; - /// Get a value from parents of given [`SnapshotId`] - /// In case of unknown [`SnapshotId`] return `Ok(None)` - fn get( - &self, - snapshot_id: SnapshotId, - key: &impl KeyCodec, - ) -> anyhow::Result>; - - /// Returns an iterator over all key-value pairs in given [`Schema`] in reverse lexicographic order - /// Starting from given [`SnapshotId`] - fn iter(&self, snapshot_id: SnapshotId) -> anyhow::Result>; - /// Returns an iterator over all key-value pairs in given [`Schema`] in reverse lexicographic order - /// Starting from given [`SnapshotId`], where largest returned key will be less or equal to `upper_bound` - fn iter_range( - &self, - snapshot_id: SnapshotId, - upper_bound: SchemaKey, - ) -> anyhow::Result>; -} - -/// Simple wrapper around `RwLock` that only allows read access. -#[derive(Debug)] -pub struct ReadOnlyLock { - lock: Arc>, -} - -impl ReadOnlyLock { - /// Create new [`ReadOnlyLock`] from [`Arc>`]. - pub fn new(lock: Arc>) -> Self { - Self { lock } - } - - /// Acquires a read lock on the underlying `RwLock`. - pub fn read(&self) -> LockResult> { - self.lock.read() - } -} - -impl From>> for ReadOnlyLock { - fn from(value: Arc>) -> Self { - Self::new(value) - } -} - -/// Wrapper around [`QueryManager`] that allows to read from snapshots -#[derive(Debug)] -pub struct DbSnapshot { - id: SnapshotId, - cache: Mutex, - parents_manager: ReadOnlyLock, -} - -impl DbSnapshot { - /// Create new [`DbSnapshot`] - pub fn new(id: SnapshotId, manager: ReadOnlyLock) -> Self { - Self { - id, - cache: Mutex::new(SchemaBatch::default()), - parents_manager: manager, - } - } - - /// Store a value in snapshot - pub fn put( - &self, - key: &impl KeyCodec, - value: &impl ValueCodec, - ) -> anyhow::Result<()> { - self.cache - .lock() - .expect("Local SchemaBatch lock must not be poisoned") - .put(key, value) - } - - /// Delete given key from snapshot - pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { - self.cache - .lock() - .expect("Local SchemaBatch lock must not be poisoned") - .delete(key) - } - - /// Writes many operations at once, atomically - pub fn write_many(&self, batch: SchemaBatch) -> anyhow::Result<()> { - let mut cache = self - .cache - .lock() - .expect("Local SchemaBatch lock must not be poisoned"); - cache.merge(batch); - Ok(()) - } -} - -impl DbSnapshot { - /// Get a value from current snapshot, its parents or underlying database - pub fn read(&self, key: &impl KeyCodec) -> anyhow::Result> { - // Some(Operation) means that key was touched, - // but in case of deletion we early return None - // Only in case of not finding operation for key, - // we go deeper - - // Hold local cache lock explicitly, so reads are atomic - let local_cache = self - .cache - .lock() - .expect("SchemaBatch lock should not be poisoned"); - - // 1. Check in cache - if let Some(operation) = local_cache.read(key)? { - return decode_operation::(operation); - } - - // 2. Check parent - let parent = self - .parents_manager - .read() - .expect("Parent lock must not be poisoned"); - parent.get::(self.id, key) - } - - /// Get value of largest key written value for given [`Schema`] - pub fn get_largest(&self) -> anyhow::Result> { - let local_cache = self - .cache - .lock() - .expect("SchemaBatch lock must not be poisoned"); - let local_cache_iter = local_cache.iter::(); - - let parent = self - .parents_manager - .read() - .expect("Parent lock must not be poisoned"); - - let parent_iter = parent.iter::(self.id)?; - - let mut combined_iter: SnapshotIter<'_, S, _, _> = SnapshotIter { - local_cache_iter: local_cache_iter.peekable(), - parent_iter: parent_iter.peekable(), - }; - - if let Some((key, value)) = combined_iter.next() { - let key = S::Key::decode_key(&key)?; - let value = S::Value::decode_value(&value)?; - return Ok(Some((key, value))); - } - - Ok(None) - } - - /// Get largest value in [`Schema`] that is smaller or equal than give `seek_key` - pub fn get_prev( - &self, - seek_key: &impl SeekKeyEncoder, - ) -> anyhow::Result> { - let seek_key = seek_key.encode_seek_key()?; - let local_cache = self - .cache - .lock() - .expect("Local cache lock must not be poisoned"); - let local_cache_iter = local_cache.iter_range::(seek_key.clone()); - - let parent = self - .parents_manager - .read() - .expect("Parent snapshots lock must not be poisoned"); - let parent_iter = parent.iter_range::(self.id, seek_key.clone())?; - - let mut combined_iter: SnapshotIter<'_, S, _, _> = SnapshotIter { - local_cache_iter: local_cache_iter.peekable(), - parent_iter: parent_iter.peekable(), - }; - - if let Some((key, value)) = combined_iter.next() { - let key = S::Key::decode_key(&key)?; - let value = S::Value::decode_value(&value)?; - return Ok(Some((key, value))); - } - Ok(None) - } -} - -struct SnapshotIter<'a, S, LocalIter, ParentIter> -where - S: Schema, - LocalIter: Iterator, - ParentIter: Iterator, -{ - local_cache_iter: std::iter::Peekable>, - parent_iter: std::iter::Peekable, -} - -impl<'a, S, LocalIter, ParentIter> Iterator for SnapshotIter<'a, S, LocalIter, ParentIter> -where - S: Schema, - LocalIter: Iterator, - ParentIter: Iterator, -{ - type Item = (SchemaKey, SchemaValue); - - fn next(&mut self) -> Option { - loop { - let local_cache_peeked = self.local_cache_iter.peek(); - let parent_peeked = self.parent_iter.peek(); - - match (local_cache_peeked, parent_peeked) { - // Both iterators exhausted - (None, None) => break, - // Parent exhausted (just like me on friday) - (Some(&(key, operation)), None) => { - self.local_cache_iter.next(); - let next = put_or_none(key, operation); - if next.is_none() { - continue; - } - return next; - } - // Local exhausted - (None, Some((_key, _value))) => { - return self.parent_iter.next(); - } - // Both are active, need to compare keys - (Some(&(local_key, local_operation)), Some((parent_key, _parent_value))) => { - return if local_key < parent_key { - self.parent_iter.next() - } else { - // Local is preferable, as it is the latest - // But both operators must succeed - if local_key == parent_key { - self.parent_iter.next(); - } - self.local_cache_iter.next(); - let next = put_or_none(local_key, local_operation); - if next.is_none() { - continue; - } - next - }; - } - } - } - - None - } -} - -/// Read only version of [`DbSnapshot`], for usage inside [`QueryManager`] -pub struct ReadOnlyDbSnapshot { - id: SnapshotId, - cache: SchemaBatch, -} - -impl ReadOnlyDbSnapshot { - /// Get value from its own cache - pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { - self.cache.read(key) - } - - /// Get id of this Snapshot - pub fn get_id(&self) -> SnapshotId { - self.id - } - - /// Iterate over all operations in snapshot in reversed lexicographic order - pub fn iter( - &self, - ) -> SchemaBatchIterator<'_, S, Rev>> { - self.cache.iter::() - } - - /// Iterate over all operations in snapshot in reversed lexicographical order, starting from `upper_bound` - pub fn iter_range( - &self, - upper_bound: SchemaKey, - ) -> SchemaBatchIterator<'_, S, Rev>> { - self.cache.iter_range::(upper_bound) - } -} - -impl From> for ReadOnlyDbSnapshot { - fn from(snapshot: DbSnapshot) -> Self { - Self { - id: snapshot.id, - cache: snapshot - .cache - .into_inner() - .expect("SchemaBatch lock must not be poisoned"), - } - } -} - -impl From for SchemaBatch { - fn from(value: ReadOnlyDbSnapshot) -> Self { - value.cache - } -} - -fn decode_operation(operation: &Operation) -> anyhow::Result> { - match operation { - Operation::Put { value } => { - let value = S::Value::decode_value(value)?; - Ok(Some(value)) - } - Operation::Delete => Ok(None), - } -} - -fn put_or_none(key: &SchemaKey, operation: &Operation) -> Option<(SchemaKey, SchemaValue)> { - if let Operation::Put { value } = operation { - return Some((key.to_vec(), value.to_vec())); - } - None -} - -/// QueryManager, which never returns any values -#[derive(Clone, Debug, Default)] -pub struct NoopQueryManager; - -impl QueryManager for NoopQueryManager { - type Iter<'a, S: Schema> = std::iter::Empty<(SchemaKey, SchemaValue)>; - type RangeIter<'a, S: Schema> = std::iter::Empty<(SchemaKey, SchemaValue)>; - - fn get( - &self, - _snapshot_id: SnapshotId, - _key: &impl KeyCodec, - ) -> anyhow::Result> { - Ok(None) - } - - fn iter(&self, _snapshot_id: SnapshotId) -> anyhow::Result> { - Ok(std::iter::empty()) - } - - fn iter_range( - &self, - _snapshot_id: SnapshotId, - _upper_bound: SchemaKey, - ) -> anyhow::Result> { - Ok(std::iter::empty()) - } -} - -/// Snapshot manager, where all snapshots are collapsed into 1 -#[derive(Default)] -pub struct SingleSnapshotQueryManager { - cache: SchemaBatch, -} - -impl SingleSnapshotQueryManager { - /// Adding new snapshot. It will override any existing data on key match - pub fn add_snapshot(&mut self, snapshot: ReadOnlyDbSnapshot) { - let ReadOnlyDbSnapshot { - cache: new_data, .. - } = snapshot; - - self.cache.merge(new_data); - } -} - -impl QueryManager for SingleSnapshotQueryManager { - type Iter<'a, S: Schema> = std::vec::IntoIter<(SchemaKey, SchemaValue)>; - type RangeIter<'a, S: Schema> = std::vec::IntoIter<(SchemaKey, SchemaValue)>; - - fn get( - &self, - _snapshot_id: SnapshotId, - key: &impl KeyCodec, - ) -> anyhow::Result> { - if let Some(Operation::Put { value }) = self.cache.read(key)? { - let value = S::Value::decode_value(value)?; - return Ok(Some(value)); - } - Ok(None) - } - - fn iter(&self, _snapshot_id: SnapshotId) -> anyhow::Result> { - let collected: Vec<(SchemaKey, SchemaValue)> = self - .cache - .iter::() - .filter_map(|(k, op)| match op { - Operation::Put { value } => Some((k.to_vec(), value.to_vec())), - Operation::Delete => None, - }) - .collect(); - - Ok(collected.into_iter()) - } - - fn iter_range( - &self, - _snapshot_id: SnapshotId, - upper_bound: SchemaKey, - ) -> anyhow::Result> { - let collected: Vec<(SchemaKey, SchemaValue)> = self - .cache - .iter_range::(upper_bound) - .filter_map(|(k, op)| match op { - Operation::Put { value } => Some((k.to_vec(), value.to_vec())), - Operation::Delete => None, - }) - .collect(); - - Ok(collected.into_iter()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::define_schema; - use crate::schema::KeyEncoder; - use crate::test::{TestCompositeField, TestField}; - - define_schema!(TestSchema, TestCompositeField, TestField, "TestCF"); - - fn encode_key(key: &TestCompositeField) -> Vec { - >::encode_key(key).unwrap() - } - - fn encode_value(value: &TestField) -> Vec { - >::encode_value(value).unwrap() - } - - #[test] - fn test_db_snapshot_iterator_empty() { - let local_cache = SchemaBatch::new(); - let parent_values = SchemaBatch::new(); - - let manager = SingleSnapshotQueryManager { - cache: parent_values, - }; - - let local_cache_iter = local_cache.iter::().peekable(); - let manager_iter = manager.iter::(0).unwrap().peekable(); - - let snapshot_iter = SnapshotIter::<'_, TestSchema, _, _> { - local_cache_iter, - parent_iter: manager_iter, - }; - - let values: Vec<(SchemaKey, SchemaValue)> = snapshot_iter.collect(); - - assert!(values.is_empty()); - } - - #[test] - fn test_db_snapshot_iterator_values() { - let k1 = TestCompositeField(0, 1, 0); - let k2 = TestCompositeField(0, 1, 2); - let k3 = TestCompositeField(3, 1, 0); - let k4 = TestCompositeField(3, 2, 0); - - let mut parent_values = SchemaBatch::new(); - parent_values.put::(&k2, &TestField(2)).unwrap(); - parent_values.put::(&k1, &TestField(1)).unwrap(); - parent_values.put::(&k4, &TestField(4)).unwrap(); - parent_values.put::(&k3, &TestField(3)).unwrap(); - - let mut local_cache = SchemaBatch::new(); - local_cache.delete::(&k3).unwrap(); - local_cache.put::(&k1, &TestField(10)).unwrap(); - local_cache.put::(&k2, &TestField(20)).unwrap(); - - let manager = SingleSnapshotQueryManager { - cache: parent_values, - }; - - let local_cache_iter = local_cache.iter::().peekable(); - let manager_iter = manager.iter::(0).unwrap().peekable(); - - let snapshot_iter = SnapshotIter::<'_, TestSchema, _, _> { - local_cache_iter, - parent_iter: manager_iter, - }; - - let actual_values: Vec<(SchemaKey, SchemaValue)> = snapshot_iter.collect(); - let expected_values = vec![ - (encode_key(&k4), encode_value(&TestField(4))), - (encode_key(&k2), encode_value(&TestField(20))), - (encode_key(&k1), encode_value(&TestField(10))), - ]; - - assert_eq!(expected_values, actual_values); - } -} diff --git a/src/test.rs b/src/test.rs index fc88ee5..f3e6635 100644 --- a/src/test.rs +++ b/src/test.rs @@ -1,4 +1,4 @@ -//! Helpers structures for testing, such as fields. +//! Helpers structures for testing, such as fields use anyhow::Result; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; @@ -114,3 +114,36 @@ impl SeekKeyEncoder for KeyPrefix2 { Ok(bytes) } } + +#[cfg(feature = "arbitrary")] +impl proptest::arbitrary::Arbitrary for TestField { + type Parameters = std::ops::Range; + + fn arbitrary_with(args: Self::Parameters) -> Self::Strategy { + use proptest::prelude::any; + use proptest::strategy::Strategy; + + any::() + .prop_filter("Value should be in range", move |v| args.contains(v)) + .prop_map(TestField) + .boxed() + } + + type Strategy = proptest::strategy::BoxedStrategy; +} + +#[cfg(feature = "arbitrary")] +impl proptest::arbitrary::Arbitrary for TestCompositeField { + type Parameters = (); + + fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy { + use proptest::prelude::any; + use proptest::strategy::Strategy; + + (any::(), any::(), any::()) + .prop_map(|(a, b, c)| TestCompositeField(a, b, c)) + .boxed() + } + + type Strategy = proptest::strategy::BoxedStrategy; +} diff --git a/tests/iterator_test.rs b/tests/iterator_test.rs index 0dbfb5b..e49d885 100644 --- a/tests/iterator_test.rs +++ b/tests/iterator_test.rs @@ -1,13 +1,15 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use rockbound::cache::cache_container::CacheContainer; +use rockbound::cache::cache_db::CacheDb; use rockbound::schema::{KeyDecoder, KeyEncoder, ValueCodec}; -use rockbound::snapshot::{DbSnapshot, ReadOnlyLock, SingleSnapshotQueryManager}; use rockbound::test::{KeyPrefix1, KeyPrefix2, TestCompositeField, TestField}; use rockbound::{ - define_schema, Operation, Schema, SchemaBatch, SchemaIterator, SeekKeyEncoder, DB, + define_schema, Operation, ReadOnlyLock, Schema, SchemaBatch, SchemaIterator, SeekKeyEncoder, DB, }; use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; use tempfile::TempDir; @@ -37,14 +39,18 @@ struct TestDB { db: DB, } +fn open_inner_db(path: &std::path::Path) -> DB { + let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME, S::COLUMN_FAMILY_NAME]; + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + DB::open(path, "test-iterator-db", column_families, &db_opts).unwrap() +} + impl TestDB { fn new() -> Self { let tmpdir = tempfile::tempdir().unwrap(); - let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME, S::COLUMN_FAMILY_NAME]; - let mut db_opts = rocksdb::Options::default(); - db_opts.create_if_missing(true); - db_opts.create_missing_column_families(true); - let db = DB::open(tmpdir.path(), "test", column_families, &db_opts).unwrap(); + let db = open_inner_db(tmpdir.path()); db.put::(&TestCompositeField(1, 0, 0), &TestField(100)) .unwrap(); @@ -244,7 +250,7 @@ fn test_schema_batch_iteration_order() { batch.put::(key, value).unwrap(); } - let iter = batch.iter::(); + let iter = batch.iter::().rev(); let collected: Vec<_> = iter .filter_map(|(key, value)| match value { Operation::Put { value } => Some(( @@ -271,7 +277,7 @@ fn test_schema_batch_iteration_with_deletions() { batch .put::(&TestCompositeField(1, 0, 0), &TestField(2)) .unwrap(); - let mut iter = batch.iter::().peekable(); + let mut iter = batch.iter::().rev().peekable(); let first1 = iter.peek().unwrap(); assert_eq!(first1.0, &encode_key(&TestCompositeField(12, 0, 0))); assert_eq!( @@ -310,7 +316,7 @@ fn test_schema_batch_iter_range() { >::encode_seek_key(&TestCompositeField(11, 0, 0)) .unwrap(); - let mut iter = batch.iter_range::(seek_key); + let mut iter = batch.iter_range::(..=seek_key).rev(); assert_eq!( Some(( @@ -351,11 +357,22 @@ fn test_schema_batch_iter_range() { #[test] fn test_db_snapshot_get_last_value() { - let manager = Arc::new(RwLock::new(SingleSnapshotQueryManager::default())); + let db = TestDB::new(); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + { + let mut to_parent = to_parent.write().unwrap(); + to_parent.insert(1, 0); + } + let manager = Arc::new(RwLock::new(CacheContainer::new(db.db, to_parent.into()))); - let snapshot_1 = DbSnapshot::new(0, ReadOnlyLock::new(manager.clone())); + let snapshot_1 = CacheDb::new(0, ReadOnlyLock::new(manager.clone())); - assert!(snapshot_1.get_largest::().unwrap().is_none()); + { + let (largest_key_in_db, largest_value_in_db) = + snapshot_1.get_largest::().unwrap().unwrap(); + assert_eq!(TestCompositeField(2, 0, 2), largest_key_in_db); + assert_eq!(TestField(202), largest_value_in_db); + } let key_1 = TestCompositeField(8, 2, 3); let value_1 = TestField(6); @@ -363,20 +380,20 @@ fn test_db_snapshot_get_last_value() { snapshot_1.put::(&key_1, &value_1).unwrap(); { - let (latest_key, latest_value) = snapshot_1 + let (largest_key, largest_value) = snapshot_1 .get_largest::() .unwrap() .expect("largest key-value pair should be found"); - assert_eq!(key_1, latest_key); - assert_eq!(value_1, latest_value); + assert_eq!(key_1, largest_key); + assert_eq!(value_1, largest_value); } { let mut manager = manager.write().unwrap(); - manager.add_snapshot(snapshot_1.into()); + manager.add_snapshot(snapshot_1.into()).unwrap(); } - let snapshot_2 = DbSnapshot::new(1, ReadOnlyLock::new(manager.clone())); + let snapshot_2 = CacheDb::new(1, ReadOnlyLock::new(manager)); { let (latest_key, latest_value) = snapshot_2 @@ -425,11 +442,19 @@ fn test_db_snapshot_get_last_value() { } #[test] -fn test_db_snapshot_get_prev_value() { - let manager = Arc::new(RwLock::new(SingleSnapshotQueryManager::default())); +fn test_db_cache_container_get_prev_value() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_inner_db(tmpdir.path()); + let to_parent = Arc::new(RwLock::new(HashMap::new())); + { + let mut to_parent = to_parent.write().unwrap(); + to_parent.insert(1, 0); + to_parent.insert(2, 1); + } + let cache_container = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); // Snapshots 1 and 2 are to black box usages of parents iterator - let snapshot_1 = DbSnapshot::new(0, ReadOnlyLock::new(manager.clone())); + let snapshot_1 = CacheDb::new(0, ReadOnlyLock::new(cache_container.clone())); let key_1 = TestCompositeField(8, 2, 3); let key_2 = TestCompositeField(8, 2, 0); @@ -465,11 +490,11 @@ fn test_db_snapshot_get_prev_value() { ); { - let mut manager = manager.write().unwrap(); - manager.add_snapshot(snapshot_1.into()); + let mut manager = cache_container.write().unwrap(); + manager.add_snapshot(snapshot_1.into()).unwrap(); } - let snapshot_2 = DbSnapshot::new(1, ReadOnlyLock::new(manager.clone())); + let snapshot_2 = CacheDb::new(1, ReadOnlyLock::new(cache_container.clone())); // Equal: assert_eq!( (key_1.clone(), TestField(1)), @@ -503,10 +528,10 @@ fn test_db_snapshot_get_prev_value() { snapshot_2.get_prev::(&key_1).unwrap().unwrap() ); { - let mut manager = manager.write().unwrap(); - manager.add_snapshot(snapshot_2.into()); + let mut manager = cache_container.write().unwrap(); + manager.add_snapshot(snapshot_2.into()).unwrap(); } - let snapshot_3 = DbSnapshot::new(2, ReadOnlyLock::new(manager.clone())); + let snapshot_3 = CacheDb::new(2, ReadOnlyLock::new(cache_container)); assert_eq!( (key_2.clone(), TestField(20)), snapshot_3 @@ -515,7 +540,7 @@ fn test_db_snapshot_get_prev_value() { .unwrap() ); assert_eq!( - (key_2.clone(), TestField(20)), + (key_2, TestField(20)), snapshot_3.get_prev::(&key_1).unwrap().unwrap() ); assert_eq!( diff --git a/tests/snapshot_test.rs b/tests/snapshot_test.rs index 00376a2..6594d6e 100644 --- a/tests/snapshot_test.rs +++ b/tests/snapshot_test.rs @@ -1,21 +1,49 @@ +use std::collections::HashMap; +use std::path::Path; use std::sync::{Arc, RwLock}; -use rockbound::define_schema; -use rockbound::snapshot::{DbSnapshot, ReadOnlyLock, SingleSnapshotQueryManager}; +use rockbound::cache::cache_container::CacheContainer; +use rockbound::cache::cache_db::CacheDb; +use rockbound::schema::ColumnFamilyName; use rockbound::test::TestField; +use rockbound::{define_schema, ReadOnlyLock, Schema, DB}; +use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; define_schema!(TestSchema1, TestField, TestField, "TestCF1"); type S = TestSchema1; +fn get_column_families() -> Vec { + vec![DEFAULT_COLUMN_FAMILY_NAME, S::COLUMN_FAMILY_NAME] +} + +fn open_db(dir: impl AsRef) -> DB { + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + DB::open(dir, "test", get_column_families(), &db_opts).expect("Failed to open DB.") +} + #[test] fn snapshot_lifecycle() { - let manager = Arc::new(RwLock::new(SingleSnapshotQueryManager::default())); + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(&tmpdir); + + let to_parent = Arc::new(RwLock::new(HashMap::new())); + { + let mut to_parent = to_parent.write().unwrap(); + to_parent.insert(1, 0); + to_parent.insert(2, 1); + } + let manager = Arc::new(RwLock::new(CacheContainer::new( + db, + to_parent.clone().into(), + ))); let key = TestField(1); let value = TestField(1); - let snapshot_1 = DbSnapshot::new(0, ReadOnlyLock::new(manager.clone())); + let snapshot_1 = CacheDb::new(0, ReadOnlyLock::new(manager.clone())); assert_eq!( None, snapshot_1.read::(&key).unwrap(), @@ -30,20 +58,26 @@ fn snapshot_lifecycle() { ); { let mut manager = manager.write().unwrap(); - manager.add_snapshot(snapshot_1.into()); + manager.add_snapshot(snapshot_1.into()).unwrap(); } // Snapshot 2: reads value from snapshot 1, then deletes it - let snapshot_2 = DbSnapshot::new(1, ReadOnlyLock::new(manager.clone())); + let snapshot_2 = CacheDb::new(1, ReadOnlyLock::new(manager.clone())); assert_eq!(Some(value), snapshot_2.read::(&key).unwrap()); snapshot_2.delete::(&key).unwrap(); assert_eq!(None, snapshot_2.read::(&key).unwrap()); { let mut manager = manager.write().unwrap(); - manager.add_snapshot(snapshot_2.into()); + manager.add_snapshot(snapshot_2.into()).unwrap(); + let mut to_parent = to_parent.write().unwrap(); + to_parent.insert(1, 0); } // Snapshot 3: gets empty result, event value is in some previous snapshots - let snapshot_3 = DbSnapshot::new(2, ReadOnlyLock::new(manager.clone())); + let snapshot_3 = CacheDb::new(2, ReadOnlyLock::new(manager)); + { + let mut to_parent = to_parent.write().unwrap(); + to_parent.insert(2, 1); + } assert_eq!(None, snapshot_3.read::(&key).unwrap()); }