Skip to content

Commit

Permalink
Add DbSnapshot to sov-db-schema (#1112)
Browse files Browse the repository at this point in the history
* [no ci] Switch to HashMap of last operation from vec

* [no ci] Adding a DbSnapshot

* [no ci] Remove lock from inside SchemaBatch

* [no ci] Fix lint and add FrozenDbSnapshot

* Add test for snapshot lifecycle

* Formatting

* Remove unrelated TODO

* I want bigger coverage

* Fix lint!

* Simplify snapshot get

* Lint fix and explicit hold of the lock

* Change QueryManager interface and DbSnapshot implementation
  • Loading branch information
citizen-stig authored Oct 30, 2023
1 parent a0a30f6 commit 6f7cb3d
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 48 deletions.
2 changes: 1 addition & 1 deletion full-node/db/sov-db/src/native_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl NativeDB {
&self,
key_value_pairs: impl IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
) -> anyhow::Result<()> {
let batch = SchemaBatch::default();
let mut batch = SchemaBatch::default();
for (key, value) in key_value_pairs {
batch.put::<ModuleAccessoryState>(&key, &value)?;
}
Expand Down
121 changes: 78 additions & 43 deletions full-node/db/sov-schema-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
mod iterator;
mod metrics;
pub mod schema;
pub mod snapshot;

use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex;

use anyhow::format_err;
use iterator::ScanDirection;
Expand Down Expand Up @@ -144,7 +144,7 @@ impl DB {
) -> anyhow::Result<()> {
// Not necessary to use a batch, but we'd like a central place to bump counters.
// Used in tests only anyway.
let batch = SchemaBatch::new();
let mut batch = SchemaBatch::new();
batch.put::<S>(key, value)?;
self.write_schemas(batch)
}
Expand Down Expand Up @@ -179,15 +179,13 @@ impl DB {
let _timer = SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS
.with_label_values(&[self.name])
.start_timer();
let rows_locked = batch.rows.lock().expect("Lock must not be poisoned");

let mut db_batch = rocksdb::WriteBatch::default();
for (cf_name, rows) in rows_locked.iter() {
for (cf_name, rows) in batch.last_writes.iter() {
let cf_handle = self.get_cf_handle(cf_name)?;
for write_op in rows {
match write_op {
WriteOp::Value { key, value } => db_batch.put_cf(cf_handle, key, value),
WriteOp::Deletion { key } => db_batch.delete_cf(cf_handle, key),
for (key, operation) in rows {
match operation {
Operation::Put { value } => db_batch.put_cf(cf_handle, key, value),
Operation::Delete => db_batch.delete_cf(cf_handle, key),
}
}
}
Expand All @@ -196,15 +194,15 @@ impl DB {
self.inner.write_opt(db_batch, &default_write_options())?;

// Bump counters only after DB write succeeds.
for (cf_name, rows) in rows_locked.iter() {
for write_op in rows {
match write_op {
WriteOp::Value { key, value } => {
for (cf_name, rows) in batch.last_writes.iter() {
for (key, operation) in rows {
match operation {
Operation::Put { value } => {
SCHEMADB_PUT_BYTES
.with_label_values(&[cf_name])
.observe((key.len() + value.len()) as f64);
}
WriteOp::Deletion { key: _ } => {
Operation::Delete => {
SCHEMADB_DELETES.with_label_values(&[cf_name]).inc();
}
}
Expand Down Expand Up @@ -253,19 +251,28 @@ impl DB {
}
}

type SchemaKey = Vec<u8>;
type SchemaValue = Vec<u8>;

#[cfg_attr(feature = "arbitrary", derive(proptest_derive::Arbitrary))]
#[derive(Debug, PartialEq, Eq, Hash)]
enum WriteOp {
Value { key: Vec<u8>, value: Vec<u8> },
Deletion { key: Vec<u8> },
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
/// Represents operation written to the database
pub enum Operation {
/// Writing a value to the DB.
Put {
/// Value to write
value: SchemaValue,
},
/// Deleting a value
Delete,
}

/// [`SchemaBatch`] holds a collection of updates that can be applied to a DB
/// ([`Schema`]) atomically. The updates will be applied in the order in which
/// they are added to the [`SchemaBatch`].
#[derive(Debug, Default)]
pub struct SchemaBatch {
rows: Mutex<HashMap<ColumnFamilyName, Vec<WriteOp>>>,
last_writes: HashMap<ColumnFamilyName, HashMap<SchemaKey, Operation>>,
}

impl SchemaBatch {
Expand All @@ -276,60 +283,66 @@ impl SchemaBatch {

/// Adds an insert/update operation to the batch.
pub fn put<S: Schema>(
&self,
&mut self,
key: &impl KeyCodec<S>,
value: &impl ValueCodec<S>,
) -> anyhow::Result<()> {
let _timer = SCHEMADB_BATCH_PUT_LATENCY_SECONDS
.with_label_values(&["unknown"])
.start_timer();
let key = key.encode_key()?;
let value = value.encode_value()?;
self.rows
.lock()
.expect("Lock must not be poisoned")
.entry(S::COLUMN_FAMILY_NAME)
.or_default()
.push(WriteOp::Value { key, value });

let put_operation = Operation::Put {
value: value.encode_value()?,
};
self.insert_operation::<S>(key, put_operation);
Ok(())
}

/// Adds a delete operation to the batch.
pub fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
pub fn delete<S: Schema>(&mut self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
let key = key.encode_key()?;
self.rows
.lock()
.expect("Lock must not be poisoned")
.entry(S::COLUMN_FAMILY_NAME)
.or_default()
.push(WriteOp::Deletion { key });
self.insert_operation::<S>(key, Operation::Delete);

Ok(())
}

fn insert_operation<S: Schema>(&mut self, key: SchemaKey, operation: Operation) {
let column_writes = self.last_writes.entry(S::COLUMN_FAMILY_NAME).or_default();
column_writes.insert(key, operation);
}

#[allow(dead_code)]
pub(crate) fn read<S: Schema>(
&self,
key: &impl KeyCodec<S>,
) -> anyhow::Result<Option<Operation>> {
let key = key.encode_key()?;
if let Some(column_writes) = self.last_writes.get(&S::COLUMN_FAMILY_NAME) {
return Ok(column_writes.get(&key).cloned());
}
Ok(None)
}
}

#[cfg(feature = "arbitrary")]
impl proptest::arbitrary::Arbitrary for SchemaBatch {
type Parameters = &'static [ColumnFamilyName];
type Strategy = proptest::strategy::BoxedStrategy<Self>;

fn arbitrary_with(columns: Self::Parameters) -> Self::Strategy {
use proptest::prelude::any;
use proptest::strategy::Strategy;

proptest::collection::vec(any::<Vec<WriteOp>>(), columns.len())
proptest::collection::vec(any::<HashMap<SchemaKey, Operation>>(), columns.len())
.prop_map::<SchemaBatch, _>(|vec_vec_write_ops| {
let mut rows = HashMap::new();
for (col, write_ops) in columns.iter().zip(vec_vec_write_ops.into_iter()) {
rows.insert(*col, write_ops);
}
SchemaBatch {
rows: Mutex::new(rows),
for (col, write_op) in columns.iter().zip(vec_vec_write_ops.into_iter()) {
rows.insert(*col, write_op);
}
SchemaBatch { last_writes: rows }
})
.boxed()
}

type Strategy = proptest::strategy::BoxedStrategy<Self>;
}

/// An error that occurred during (de)serialization of a [`Schema`]'s keys or
Expand Down Expand Up @@ -358,3 +371,25 @@ fn default_write_options() -> rocksdb::WriteOptions {
opts.set_sync(true);
opts
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_db_debug_output() {
let tmpdir = tempfile::tempdir().unwrap();
let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME];

let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
db_opts.create_missing_column_families(true);

let db = DB::open(tmpdir.path(), "test_db_debug", column_families, &db_opts)
.expect("Failed to open DB.");

let db_debug = format!("{:?}", db);
assert!(db_debug.contains("test_db_debug"));
assert!(db_debug.contains(tmpdir.path().to_str().unwrap()));
}
}
146 changes: 146 additions & 0 deletions full-node/db/sov-schema-db/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//! Snapshot related logic
use std::sync::{Arc, LockResult, Mutex, RwLock, RwLockReadGuard};

use crate::schema::{KeyCodec, ValueCodec};
use crate::{Operation, Schema, SchemaBatch};

/// Id of database snapshot
pub type SnapshotId = u64;

/// A trait to make nested calls to several [`SchemaBatch`]s and eventually [`crate::DB`]
pub trait QueryManager {
/// Get a value from snapshot or its parents
fn get<S: Schema>(
&self,
snapshot_id: SnapshotId,
key: &impl KeyCodec<S>,
) -> anyhow::Result<Option<S::Value>>;
}

/// Simple wrapper around `RwLock` that only allows read access.
pub struct ReadOnlyLock<T> {
lock: Arc<RwLock<T>>,
}

impl<T> ReadOnlyLock<T> {
/// Create new [`ReadOnlyLock`] from [`Arc<RwLock<T>>`].
pub fn new(lock: Arc<RwLock<T>>) -> Self {
Self { lock }
}

/// Acquires a read lock on the underlying `RwLock`.
pub fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> {
self.lock.read()
}
}

/// Wrapper around [`QueryManager`] that allows to read from snapshots
pub struct DbSnapshot<Q> {
id: SnapshotId,
cache: Mutex<SchemaBatch>,
parents_manager: ReadOnlyLock<Q>,
}

impl<Q: QueryManager> DbSnapshot<Q> {
/// Create new [`DbSnapshot`]
pub fn new(id: SnapshotId, manager: ReadOnlyLock<Q>) -> Self {
Self {
id,
cache: Mutex::new(SchemaBatch::default()),
parents_manager: manager,
}
}

/// Get a value from current snapshot, its parents or underlying database
pub fn read<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<Option<S::Value>> {
// Some(Operation) means that key was touched,
// but in case of deletion we early return None
// Only in case of not finding operation for key,
// we go deeper

// Hold local cache lock explicitly, so reads are atomic
let local_cache = self
.cache
.lock()
.expect("SchemaBatch lock should not be poisoned");

// 1. Check in cache
if let Some(operation) = local_cache.read(key)? {
return decode_operation::<S>(operation);
}

// 2. Check parent
let parent = self
.parents_manager
.read()
.expect("Parent lock must not be poisoned");
parent.get::<S>(self.id, key)
}

/// Store a value in snapshot
pub fn put<S: Schema>(
&self,
key: &impl KeyCodec<S>,
value: &impl ValueCodec<S>,
) -> anyhow::Result<()> {
self.cache
.lock()
.expect("SchemaBatch lock must not be poisoned")
.put(key, value)
}

/// Delete given key from snapshot
pub fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
self.cache
.lock()
.expect("SchemaBatch lock must not be poisoned")
.delete(key)
}
}

/// Read only version of [`DbSnapshot`], for usage inside [`QueryManager`]
pub struct FrozenDbSnapshot {
id: SnapshotId,
cache: SchemaBatch,
}

impl FrozenDbSnapshot {
/// Get value from its own cache
pub fn get<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<Option<Operation>> {
self.cache.read(key)
}

/// Get id of this Snapshot
pub fn get_id(&self) -> SnapshotId {
self.id
}
}

impl<Q> From<DbSnapshot<Q>> for FrozenDbSnapshot {
fn from(snapshot: DbSnapshot<Q>) -> Self {
Self {
id: snapshot.id,
cache: snapshot
.cache
.into_inner()
.expect("SchemaBatch lock must not be poisoned"),
}
}
}

impl From<FrozenDbSnapshot> for SchemaBatch {
fn from(value: FrozenDbSnapshot) -> Self {
value.cache
}
}

fn decode_operation<S: Schema>(operation: Operation) -> anyhow::Result<Option<S::Value>> {
match operation {
Operation::Put { value } => {
let value = S::Value::decode_value(&value)?;
Ok(Some(value))
}
Operation::Delete => Ok(None),
}
}
Loading

0 comments on commit 6f7cb3d

Please sign in to comment.