diff --git a/full-node/db/sov-db/src/native_db.rs b/full-node/db/sov-db/src/native_db.rs index 7cb3dceec..97e627b23 100644 --- a/full-node/db/sov-db/src/native_db.rs +++ b/full-node/db/sov-db/src/native_db.rs @@ -48,7 +48,7 @@ impl NativeDB { &self, key_value_pairs: impl IntoIterator, Option>)>, ) -> anyhow::Result<()> { - let batch = SchemaBatch::default(); + let mut batch = SchemaBatch::default(); for (key, value) in key_value_pairs { batch.put::(&key, &value)?; } diff --git a/full-node/db/sov-schema-db/src/lib.rs b/full-node/db/sov-schema-db/src/lib.rs index b4a286cb8..7cd85d261 100644 --- a/full-node/db/sov-schema-db/src/lib.rs +++ b/full-node/db/sov-schema-db/src/lib.rs @@ -17,10 +17,10 @@ mod iterator; mod metrics; pub mod schema; +pub mod snapshot; use std::collections::HashMap; use std::path::Path; -use std::sync::Mutex; use anyhow::format_err; use iterator::ScanDirection; @@ -144,7 +144,7 @@ impl DB { ) -> anyhow::Result<()> { // Not necessary to use a batch, but we'd like a central place to bump counters. // Used in tests only anyway. - let batch = SchemaBatch::new(); + let mut batch = SchemaBatch::new(); batch.put::(key, value)?; self.write_schemas(batch) } @@ -179,15 +179,13 @@ impl DB { let _timer = SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS .with_label_values(&[self.name]) .start_timer(); - let rows_locked = batch.rows.lock().expect("Lock must not be poisoned"); - let mut db_batch = rocksdb::WriteBatch::default(); - for (cf_name, rows) in rows_locked.iter() { + for (cf_name, rows) in batch.last_writes.iter() { let cf_handle = self.get_cf_handle(cf_name)?; - for write_op in rows { - match write_op { - WriteOp::Value { key, value } => db_batch.put_cf(cf_handle, key, value), - WriteOp::Deletion { key } => db_batch.delete_cf(cf_handle, key), + for (key, operation) in rows { + match operation { + Operation::Put { value } => db_batch.put_cf(cf_handle, key, value), + Operation::Delete => db_batch.delete_cf(cf_handle, key), } } } @@ -196,15 +194,15 @@ impl DB { self.inner.write_opt(db_batch, &default_write_options())?; // Bump counters only after DB write succeeds. - for (cf_name, rows) in rows_locked.iter() { - for write_op in rows { - match write_op { - WriteOp::Value { key, value } => { + for (cf_name, rows) in batch.last_writes.iter() { + for (key, operation) in rows { + match operation { + Operation::Put { value } => { SCHEMADB_PUT_BYTES .with_label_values(&[cf_name]) .observe((key.len() + value.len()) as f64); } - WriteOp::Deletion { key: _ } => { + Operation::Delete => { SCHEMADB_DELETES.with_label_values(&[cf_name]).inc(); } } @@ -253,11 +251,20 @@ impl DB { } } +type SchemaKey = Vec; +type SchemaValue = Vec; + #[cfg_attr(feature = "arbitrary", derive(proptest_derive::Arbitrary))] -#[derive(Debug, PartialEq, Eq, Hash)] -enum WriteOp { - Value { key: Vec, value: Vec }, - Deletion { key: Vec }, +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +/// Represents operation written to the database +pub enum Operation { + /// Writing a value to the DB. + Put { + /// Value to write + value: SchemaValue, + }, + /// Deleting a value + Delete, } /// [`SchemaBatch`] holds a collection of updates that can be applied to a DB @@ -265,7 +272,7 @@ enum WriteOp { /// they are added to the [`SchemaBatch`]. #[derive(Debug, Default)] pub struct SchemaBatch { - rows: Mutex>>, + last_writes: HashMap>, } impl SchemaBatch { @@ -276,7 +283,7 @@ impl SchemaBatch { /// Adds an insert/update operation to the batch. pub fn put( - &self, + &mut self, key: &impl KeyCodec, value: &impl ValueCodec, ) -> anyhow::Result<()> { @@ -284,52 +291,58 @@ impl SchemaBatch { .with_label_values(&["unknown"]) .start_timer(); let key = key.encode_key()?; - let value = value.encode_value()?; - self.rows - .lock() - .expect("Lock must not be poisoned") - .entry(S::COLUMN_FAMILY_NAME) - .or_default() - .push(WriteOp::Value { key, value }); - + let put_operation = Operation::Put { + value: value.encode_value()?, + }; + self.insert_operation::(key, put_operation); Ok(()) } /// Adds a delete operation to the batch. - pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { + pub fn delete(&mut self, key: &impl KeyCodec) -> anyhow::Result<()> { let key = key.encode_key()?; - self.rows - .lock() - .expect("Lock must not be poisoned") - .entry(S::COLUMN_FAMILY_NAME) - .or_default() - .push(WriteOp::Deletion { key }); + self.insert_operation::(key, Operation::Delete); Ok(()) } + + fn insert_operation(&mut self, key: SchemaKey, operation: Operation) { + let column_writes = self.last_writes.entry(S::COLUMN_FAMILY_NAME).or_default(); + column_writes.insert(key, operation); + } + + #[allow(dead_code)] + pub(crate) fn read( + &self, + key: &impl KeyCodec, + ) -> anyhow::Result> { + let key = key.encode_key()?; + if let Some(column_writes) = self.last_writes.get(&S::COLUMN_FAMILY_NAME) { + return Ok(column_writes.get(&key).cloned()); + } + Ok(None) + } } #[cfg(feature = "arbitrary")] impl proptest::arbitrary::Arbitrary for SchemaBatch { type Parameters = &'static [ColumnFamilyName]; - type Strategy = proptest::strategy::BoxedStrategy; - fn arbitrary_with(columns: Self::Parameters) -> Self::Strategy { use proptest::prelude::any; use proptest::strategy::Strategy; - proptest::collection::vec(any::>(), columns.len()) + proptest::collection::vec(any::>(), columns.len()) .prop_map::(|vec_vec_write_ops| { let mut rows = HashMap::new(); - for (col, write_ops) in columns.iter().zip(vec_vec_write_ops.into_iter()) { - rows.insert(*col, write_ops); - } - SchemaBatch { - rows: Mutex::new(rows), + for (col, write_op) in columns.iter().zip(vec_vec_write_ops.into_iter()) { + rows.insert(*col, write_op); } + SchemaBatch { last_writes: rows } }) .boxed() } + + type Strategy = proptest::strategy::BoxedStrategy; } /// An error that occurred during (de)serialization of a [`Schema`]'s keys or @@ -358,3 +371,25 @@ fn default_write_options() -> rocksdb::WriteOptions { opts.set_sync(true); opts } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_db_debug_output() { + let tmpdir = tempfile::tempdir().unwrap(); + let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME]; + + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + + let db = DB::open(tmpdir.path(), "test_db_debug", column_families, &db_opts) + .expect("Failed to open DB."); + + let db_debug = format!("{:?}", db); + assert!(db_debug.contains("test_db_debug")); + assert!(db_debug.contains(tmpdir.path().to_str().unwrap())); + } +} diff --git a/full-node/db/sov-schema-db/src/snapshot.rs b/full-node/db/sov-schema-db/src/snapshot.rs new file mode 100644 index 000000000..6bfbe3f7b --- /dev/null +++ b/full-node/db/sov-schema-db/src/snapshot.rs @@ -0,0 +1,146 @@ +//! Snapshot related logic + +use std::sync::{Arc, LockResult, Mutex, RwLock, RwLockReadGuard}; + +use crate::schema::{KeyCodec, ValueCodec}; +use crate::{Operation, Schema, SchemaBatch}; + +/// Id of database snapshot +pub type SnapshotId = u64; + +/// A trait to make nested calls to several [`SchemaBatch`]s and eventually [`crate::DB`] +pub trait QueryManager { + /// Get a value from snapshot or its parents + fn get( + &self, + snapshot_id: SnapshotId, + key: &impl KeyCodec, + ) -> anyhow::Result>; +} + +/// Simple wrapper around `RwLock` that only allows read access. +pub struct ReadOnlyLock { + lock: Arc>, +} + +impl ReadOnlyLock { + /// Create new [`ReadOnlyLock`] from [`Arc>`]. + pub fn new(lock: Arc>) -> Self { + Self { lock } + } + + /// Acquires a read lock on the underlying `RwLock`. + pub fn read(&self) -> LockResult> { + self.lock.read() + } +} + +/// Wrapper around [`QueryManager`] that allows to read from snapshots +pub struct DbSnapshot { + id: SnapshotId, + cache: Mutex, + parents_manager: ReadOnlyLock, +} + +impl DbSnapshot { + /// Create new [`DbSnapshot`] + pub fn new(id: SnapshotId, manager: ReadOnlyLock) -> Self { + Self { + id, + cache: Mutex::new(SchemaBatch::default()), + parents_manager: manager, + } + } + + /// Get a value from current snapshot, its parents or underlying database + pub fn read(&self, key: &impl KeyCodec) -> anyhow::Result> { + // Some(Operation) means that key was touched, + // but in case of deletion we early return None + // Only in case of not finding operation for key, + // we go deeper + + // Hold local cache lock explicitly, so reads are atomic + let local_cache = self + .cache + .lock() + .expect("SchemaBatch lock should not be poisoned"); + + // 1. Check in cache + if let Some(operation) = local_cache.read(key)? { + return decode_operation::(operation); + } + + // 2. Check parent + let parent = self + .parents_manager + .read() + .expect("Parent lock must not be poisoned"); + parent.get::(self.id, key) + } + + /// Store a value in snapshot + pub fn put( + &self, + key: &impl KeyCodec, + value: &impl ValueCodec, + ) -> anyhow::Result<()> { + self.cache + .lock() + .expect("SchemaBatch lock must not be poisoned") + .put(key, value) + } + + /// Delete given key from snapshot + pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { + self.cache + .lock() + .expect("SchemaBatch lock must not be poisoned") + .delete(key) + } +} + +/// Read only version of [`DbSnapshot`], for usage inside [`QueryManager`] +pub struct FrozenDbSnapshot { + id: SnapshotId, + cache: SchemaBatch, +} + +impl FrozenDbSnapshot { + /// Get value from its own cache + pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { + self.cache.read(key) + } + + /// Get id of this Snapshot + pub fn get_id(&self) -> SnapshotId { + self.id + } +} + +impl From> for FrozenDbSnapshot { + fn from(snapshot: DbSnapshot) -> Self { + Self { + id: snapshot.id, + cache: snapshot + .cache + .into_inner() + .expect("SchemaBatch lock must not be poisoned"), + } + } +} + +impl From for SchemaBatch { + fn from(value: FrozenDbSnapshot) -> Self { + value.cache + } +} + +fn decode_operation(operation: Operation) -> anyhow::Result> { + match operation { + Operation::Put { value } => { + let value = S::Value::decode_value(&value)?; + Ok(Some(value)) + } + Operation::Delete => Ok(None), + } +} diff --git a/full-node/db/sov-schema-db/tests/db_test.rs b/full-node/db/sov-schema-db/tests/db_test.rs index fcfb3c2b5..44d634244 100644 --- a/full-node/db/sov-schema-db/tests/db_test.rs +++ b/full-node/db/sov-schema-db/tests/db_test.rs @@ -209,7 +209,7 @@ fn gen_expected_values(values: &[(u32, u32)]) -> Vec<(TestField, TestField)> { fn test_single_schema_batch() { let db = TestDB::new(); - let db_batch = SchemaBatch::new(); + let mut db_batch = SchemaBatch::new(); db_batch .put::(&TestField(0), &TestField(0)) .unwrap(); @@ -247,7 +247,7 @@ fn test_single_schema_batch() { fn test_two_schema_batches() { let db = TestDB::new(); - let db_batch1 = SchemaBatch::new(); + let mut db_batch1 = SchemaBatch::new(); db_batch1 .put::(&TestField(0), &TestField(0)) .unwrap(); @@ -265,7 +265,7 @@ fn test_two_schema_batches() { gen_expected_values(&[(0, 0), (1, 1)]), ); - let db_batch2 = SchemaBatch::new(); + let mut db_batch2 = SchemaBatch::new(); db_batch2.delete::(&TestField(3)).unwrap(); db_batch2 .put::(&TestField(3), &TestField(3)) @@ -345,7 +345,7 @@ fn test_report_size() { let db = TestDB::new(); for i in 0..1000 { - let db_batch = SchemaBatch::new(); + let mut db_batch = SchemaBatch::new(); db_batch .put::(&TestField(i), &TestField(i)) .unwrap(); diff --git a/full-node/db/sov-schema-db/tests/snapshot_test.rs b/full-node/db/sov-schema-db/tests/snapshot_test.rs new file mode 100644 index 000000000..24df1a7a2 --- /dev/null +++ b/full-node/db/sov-schema-db/tests/snapshot_test.rs @@ -0,0 +1,125 @@ +use std::sync::{Arc, RwLock}; + +use byteorder::{BigEndian, ReadBytesExt}; +use sov_schema_db::schema::{KeyCodec, KeyDecoder, KeyEncoder, ValueCodec}; +use sov_schema_db::snapshot::{ + DbSnapshot, FrozenDbSnapshot, QueryManager, ReadOnlyLock, SnapshotId, +}; +use sov_schema_db::{define_schema, CodecError, Operation, Schema}; + +define_schema!(TestSchema1, TestField, TestField, "TestCF1"); + +#[derive(Debug, Eq, PartialEq, Clone)] +pub(crate) struct TestField(u32); + +impl TestField { + fn to_bytes(&self) -> Vec { + self.0.to_be_bytes().to_vec() + } + + fn from_bytes(data: &[u8]) -> sov_schema_db::schema::Result { + let mut reader = std::io::Cursor::new(data); + Ok(TestField( + reader + .read_u32::() + .map_err(|e| CodecError::Wrapped(e.into()))?, + )) + } +} + +impl KeyEncoder for TestField { + fn encode_key(&self) -> sov_schema_db::schema::Result> { + Ok(self.to_bytes()) + } +} + +impl KeyDecoder for TestField { + fn decode_key(data: &[u8]) -> sov_schema_db::schema::Result { + Self::from_bytes(data) + } +} + +impl ValueCodec for TestField { + fn encode_value(&self) -> sov_schema_db::schema::Result> { + Ok(self.to_bytes()) + } + + fn decode_value(data: &[u8]) -> sov_schema_db::schema::Result { + Self::from_bytes(data) + } +} + +#[derive(Default)] +struct LinearSnapshotManager { + snapshots: Vec, +} + +impl LinearSnapshotManager { + fn add_snapshot(&mut self, snapshot: FrozenDbSnapshot) { + self.snapshots.push(snapshot); + } +} + +impl QueryManager for LinearSnapshotManager { + fn get( + &self, + snapshot_id: SnapshotId, + key: &impl KeyCodec, + ) -> anyhow::Result> { + for snapshot in self.snapshots[..snapshot_id as usize].iter().rev() { + if let Some(operation) = snapshot.get(key)? { + return match operation { + Operation::Put { value } => Ok(Some(S::Value::decode_value(&value)?)), + Operation::Delete => Ok(None), + }; + } + } + Ok(None) + } +} + +#[test] +fn snapshot_lifecycle() { + let manager = Arc::new(RwLock::new(LinearSnapshotManager::default())); + + let key = TestField(1); + let value = TestField(1); + + let snapshot_1 = + DbSnapshot::::new(0, ReadOnlyLock::new(manager.clone())); + assert_eq!( + None, + snapshot_1.read::(&key).unwrap(), + "Incorrect value, should find nothing" + ); + + snapshot_1.put(&key, &value).unwrap(); + assert_eq!( + Some(value.clone()), + snapshot_1.read::(&key).unwrap(), + "Incorrect value, should be fetched from local cache" + ); + { + let mut manager = manager.write().unwrap(); + manager.add_snapshot(snapshot_1.into()); + } + + // Snapshot 2: reads value from snapshot 1, then deletes it + let snapshot_2 = + DbSnapshot::::new(1, ReadOnlyLock::new(manager.clone())); + assert_eq!( + Some(value.clone()), + snapshot_2.read::(&key).unwrap() + ); + snapshot_2.delete(&key).unwrap(); + assert_eq!(None, snapshot_2.read::(&key).unwrap()); + { + let mut manager = manager.write().unwrap(); + manager.add_snapshot(snapshot_2.into()); + } + + // Snapshot 3: gets empty result, event value is in some previous snapshots + let snapshot_3 = + DbSnapshot::::new(2, ReadOnlyLock::new(manager.clone())); + assert_eq!(None, snapshot_3.read::(&key).unwrap()); +}