diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..855eee9 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,74 @@ +name: CI + +on: + push: + branches: ["main"] + pull_request: + branches: ["main"] + +env: + CARGO_TERM_COLOR: always + RUSTFLAGS: -D warnings + +jobs: + check-aggregate: + # Follows the guide at . + runs-on: ubuntu-latest + if: always() + needs: + # If you're modifying this workflow file and you're adding/removing a job + # which should be required to pass before merging a PR, don't forget to + # update this list! + - check + - features + - test + steps: + - name: Compute whether the needed jobs succeeded or failed + uses: re-actors/alls-green@release/v1 + with: + allowed-skips: deploy-github-pages + jobs: ${{ toJSON(needs) }} + + check: + name: check + runs-on: buildjet-4vcpu-ubuntu-2204 + timeout-minutes: 60 + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup show + - uses: Swatinem/rust-cache@v2 + - name: Check formatting + run: cargo fmt --all --check + - run: cargo check --all-targets --all-features + - run: cargo clippy --all-targets --all-features + + # Check that every combination of features is working properly. + features: + name: features + runs-on: buildjet-8vcpu-ubuntu-2204 + timeout-minutes: 120 + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup show + - name: cargo install cargo-hack + uses: taiki-e/install-action@cargo-hack + - uses: Swatinem/rust-cache@v2 + - name: cargo hack + run: cargo hack check --workspace --feature-powerset --all-targets + test: + name: test + runs-on: buildjet-4vcpu-ubuntu-2204 + timeout-minutes: 60 + steps: + - uses: actions/checkout@v3 + - uses: taiki-e/install-action@nextest + - name: Install Rust + run: rustup show + - uses: Swatinem/rust-cache@v2 + - run: cargo nextest run --workspace --all-features + # `cargo-nextest` does not support doctests (yet?), so we have to run them + # separately. + # TODO: https://github.com/nextest-rs/nextest/issues/16 + - run: cargo test --workspace --doc --all-features diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dfc577c --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +# From . + +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..e571b09 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "rockbound" +version = "1.0.0" +edition = "2021" +authors = ["Sovereign Labs"] +homepage = "https://github.com/sovereign-labs/rockbound" +repository = "https://github.com/sovereign-labs/rockbound" +description = "A low level interface transforming RocksDB into a type-oriented data store" + +# Some of the code is derived from Aptos +# which is licensed under Apache-2.0. +license = "Apache-2.0" + +readme = "README.md" + +[dependencies] +anyhow = "1" +byteorder = { version = "1", default-features = true, optional = true } +once_cell = "1" +prometheus = { version = "0.13", optional = true } +proptest = { version = "1", optional = true } +proptest-derive = { version = "0.4", optional = true } +rocksdb = { version = "0.21" } +thiserror = "1" +tracing = "0.1" + +[dev-dependencies] +rockbound = { path = ".", features = ["test-utils"] } +tempfile = "3" + +[features] +default = [] + +arbitrary = ["dep:proptest", "dep:proptest-derive"] +prometheus = ["dep:prometheus"] +test-utils = ["dep:byteorder"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..f92bf67 --- /dev/null +++ b/README.md @@ -0,0 +1,60 @@ +# Rockbound + +This package is a low-level wrapper transforming [RocksDB](https://rocksdb.org/) from a byte-oriented key value store into a +type-oriented store. It's adapted from a similar package in Aptos-Core. + +The most important concept exposed by Rockbound is a `Schema`, which maps a column-family name +codec's for the key and value. + +```rust +pub trait Schema { + /// The column family name associated with this struct. + /// Note: all schemas within the same DB must have distinct column family names. + const COLUMN_FAMILY_NAME: &'static str; + + /// Type of the key. + type Key: KeyCodec; + + /// Type of the value. + type Value: ValueCodec; +} +``` + +Using this schema, we can write generic functions for storing and fetching typed data, and ensure that +it's always encoded/decoded in the way we expect. + +```rust +impl SchemaDB { + pub fn put(&self, key: &impl KeyCodec, value: &impl ValueCodec) -> Result<()> { + let key_bytes = key.encode_key()?; + let value_bytes = value.encode_value()?; + self.rocks_db_handle.put(S::COLUMN_FAMILY_NAME, key_bytes, value_bytes) + } +} +``` + +To actually store and retrieve data, all we need to do is to implement a Schema: + +```rust +pub struct AccountBalanceSchema; + +impl Schema for AccountBalanceSchema { + const COLUMN_FAMILY_NAME: &str = "account_balances"; + type Key = Account; + type Value = u64; +} + +impl KeyCodec for Account { + fn encode_key(&self) -> Vec { + bincode::to_vec(self) + } + + fn decode_key(key: Vec) -> Self { + // elided + } +} + +impl ValueCode for u64 { + // elided +} +``` diff --git a/src/iterator.rs b/src/iterator.rs new file mode 100644 index 0000000..0a3d7ae --- /dev/null +++ b/src/iterator.rs @@ -0,0 +1,192 @@ +use std::iter::FusedIterator; +use std::marker::PhantomData; + +use anyhow::Result; + +use crate::metrics::{ROCKBOUND_ITER_BYTES, ROCKBOUND_ITER_LATENCY_SECONDS}; +use crate::schema::{KeyDecoder, Schema, ValueCodec}; +use crate::{SchemaKey, SchemaValue}; + +/// This defines a type that can be used to seek a [`SchemaIterator`], via +/// interfaces like [`SchemaIterator::seek`]. Mind you, not all +/// [`KeyEncoder`](crate::schema::KeyEncoder)s shall be [`SeekKeyEncoder`]s, and +/// vice versa. E.g.: +/// +/// - Some key types don't use an encoding that results in sensible +/// seeking behavior under lexicographic ordering (what RocksDB uses by +/// default), which means you shouldn't implement [`SeekKeyEncoder`] at all. +/// - Other key types might maintain full lexicographic order, which means the +/// original key type can also be [`SeekKeyEncoder`]. +/// - Other key types may be composite, and the first field alone may be +/// a good candidate for [`SeekKeyEncoder`]. +pub trait SeekKeyEncoder: Sized { + /// Converts `self` to bytes which is used to seek the underlying raw + /// iterator. + /// + /// If `self` is also a [`KeyEncoder`](crate::schema::KeyEncoder), then + /// [`SeekKeyEncoder::encode_seek_key`] MUST return the same bytes as + /// [`KeyEncoder::encode_key`](crate::schema::KeyEncoder::encode_key). + fn encode_seek_key(&self) -> crate::schema::Result>; +} + +pub(crate) enum ScanDirection { + Forward, + Backward, +} + +/// DB Iterator parameterized on [`Schema`] that seeks with [`Schema::Key`] and yields +/// [`Schema::Key`] and [`Schema::Value`] pairs. +pub struct SchemaIterator<'a, S> { + db_iter: rocksdb::DBRawIterator<'a>, + direction: ScanDirection, + phantom: PhantomData, +} + +impl<'a, S> SchemaIterator<'a, S> +where + S: Schema, +{ + pub(crate) fn new(db_iter: rocksdb::DBRawIterator<'a>, direction: ScanDirection) -> Self { + SchemaIterator { + db_iter, + direction, + phantom: PhantomData, + } + } + + /// Seeks to the first key. + pub fn seek_to_first(&mut self) { + self.db_iter.seek_to_first(); + } + + /// Seeks to the last key. + pub fn seek_to_last(&mut self) { + self.db_iter.seek_to_last(); + } + + /// Seeks to the first key whose binary representation is equal to or greater than that of the + /// `seek_key`. + pub fn seek(&mut self, seek_key: &impl SeekKeyEncoder) -> Result<()> { + let key = seek_key.encode_seek_key()?; + self.db_iter.seek(&key); + Ok(()) + } + + /// Seeks to the last key whose binary representation is less than or equal to that of the + /// `seek_key`. + /// + /// See example in [`RocksDB doc`](https://github.com/facebook/rocksdb/wiki/SeekForPrev). + pub fn seek_for_prev(&mut self, seek_key: &impl SeekKeyEncoder) -> Result<()> { + let key = seek_key.encode_seek_key()?; + self.db_iter.seek_for_prev(&key); + Ok(()) + } + + /// Reverses iterator direction. + pub fn rev(self) -> Self { + let new_direction = match self.direction { + ScanDirection::Forward => ScanDirection::Backward, + ScanDirection::Backward => ScanDirection::Forward, + }; + SchemaIterator { + db_iter: self.db_iter, + direction: new_direction, + phantom: Default::default(), + } + } + + fn next_impl(&mut self) -> Result>> { + let _timer = ROCKBOUND_ITER_LATENCY_SECONDS + .with_label_values(&[S::COLUMN_FAMILY_NAME]) + .start_timer(); + + if !self.db_iter.valid() { + self.db_iter.status()?; + return Ok(None); + } + + let raw_key = self.db_iter.key().expect("db_iter.key() failed."); + let raw_value = self.db_iter.value().expect("db_iter.value() failed."); + let value_size_bytes = raw_value.len(); + ROCKBOUND_ITER_BYTES + .with_label_values(&[S::COLUMN_FAMILY_NAME]) + .observe((raw_key.len() + raw_value.len()) as f64); + + let key = >::decode_key(raw_key)?; + let value = >::decode_value(raw_value)?; + + match self.direction { + ScanDirection::Forward => self.db_iter.next(), + ScanDirection::Backward => self.db_iter.prev(), + } + + Ok(Some(IteratorOutput { + key, + value, + value_size_bytes, + })) + } +} + +/// The output of [`SchemaIterator`]'s next_impl +pub struct IteratorOutput { + pub key: K, + pub value: V, + pub value_size_bytes: usize, +} + +impl IteratorOutput { + pub fn into_tuple(self) -> (K, V) { + (self.key, self.value) + } +} + +impl<'a, S> Iterator for SchemaIterator<'a, S> +where + S: Schema, +{ + type Item = Result>; + + fn next(&mut self) -> Option { + self.next_impl().transpose() + } +} + +impl<'a, S> FusedIterator for SchemaIterator<'a, S> where S: Schema {} + +/// Iterates over given column backwards +pub struct RawDbReverseIterator<'a> { + db_iter: rocksdb::DBRawIterator<'a>, +} + +impl<'a> RawDbReverseIterator<'a> { + pub(crate) fn new(mut db_iter: rocksdb::DBRawIterator<'a>) -> Self { + db_iter.seek_to_last(); + RawDbReverseIterator { db_iter } + } + + /// Navigate iterator go given key + pub fn seek(&mut self, seek_key: SchemaKey) -> Result<()> { + self.db_iter.seek_for_prev(&seek_key); + Ok(()) + } +} + +impl<'a> Iterator for RawDbReverseIterator<'a> { + type Item = (SchemaKey, SchemaValue); + + fn next(&mut self) -> Option { + if !self.db_iter.valid() { + self.db_iter.status().ok()?; + return None; + } + + let next_item = self.db_iter.item().expect("db_iter.key() failed."); + // Have to allocate to fix lifetime issue + let next_item = (next_item.0.to_vec(), next_item.1.to_vec()); + + self.db_iter.prev(); + + Some(next_item) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..a89aaaf --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,357 @@ +// SPDX-License-Identifier: Apache-2.0 +// Adapted from aptos-core/schemadb + +#![forbid(unsafe_code)] +#![deny(missing_docs)] + +//! This library implements a schematized DB on top of [RocksDB](https://rocksdb.org/). It makes +//! sure all data passed in and out are structured according to predefined schemas and prevents +//! access to raw keys and values. This library also enforces a set of specific DB options, +//! like custom comparators and schema-to-column-family mapping. +//! +//! It requires that different kinds of key-value pairs be stored in separate column +//! families. To use this library to store a kind of key-value pairs, the user needs to use the +//! [`define_schema!`] macro to define the schema name, the types of key and value, and name of the +//! column family. + +mod iterator; +mod metrics; +pub mod schema; +mod schema_batch; +pub mod snapshot; +#[cfg(feature = "test-utils")] +pub mod test; + +use std::path::Path; + +use anyhow::format_err; +use iterator::ScanDirection; +pub use iterator::{RawDbReverseIterator, SchemaIterator, SeekKeyEncoder}; +use metrics::{ + ROCKBOUND_BATCH_COMMIT_BYTES, ROCKBOUND_BATCH_COMMIT_LATENCY_SECONDS, ROCKBOUND_DELETES, + ROCKBOUND_GET_BYTES, ROCKBOUND_GET_LATENCY_SECONDS, ROCKBOUND_PUT_BYTES, +}; +pub use rocksdb; +use rocksdb::ReadOptions; +pub use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; +use thiserror::Error; +use tracing::info; + +pub use crate::schema::Schema; +use crate::schema::{ColumnFamilyName, KeyCodec, ValueCodec}; +pub use crate::schema_batch::{SchemaBatch, SchemaBatchIterator}; + +/// This DB is a schematized RocksDB wrapper where all data passed in and out are typed according to +/// [`Schema`]s. +#[derive(Debug)] +pub struct DB { + name: &'static str, // for logging + inner: rocksdb::DB, +} + +impl DB { + /// Opens a database backed by RocksDB, using the provided column family names and default + /// column family options. + pub fn open( + path: impl AsRef, + name: &'static str, + column_families: impl IntoIterator>, + db_opts: &rocksdb::Options, + ) -> anyhow::Result { + let db = DB::open_with_cfds( + db_opts, + path, + name, + column_families.into_iter().map(|cf_name| { + let mut cf_opts = rocksdb::Options::default(); + cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_opts) + }), + )?; + Ok(db) + } + + /// Open RocksDB with the provided column family descriptors. + /// This allows to configure options for each column family. + pub fn open_with_cfds( + db_opts: &rocksdb::Options, + path: impl AsRef, + name: &'static str, + cfds: impl IntoIterator, + ) -> anyhow::Result { + let inner = rocksdb::DB::open_cf_descriptors(db_opts, path, cfds)?; + Ok(Self::log_construct(name, inner)) + } + + /// Open db in readonly mode. This db is completely static, so any writes that occur on the primary + /// after it has been opened will not be visible to the readonly instance. + pub fn open_cf_readonly( + opts: &rocksdb::Options, + path: impl AsRef, + name: &'static str, + cfs: Vec, + ) -> anyhow::Result { + let error_if_log_file_exists = false; + let inner = rocksdb::DB::open_cf_for_read_only(opts, path, cfs, error_if_log_file_exists)?; + + Ok(Self::log_construct(name, inner)) + } + + /// Open db in secondary mode. A secondary db is does not support writes, but can be dynamically caught up + /// to the primary instance by a manual call. See + /// for more details. + pub fn open_cf_as_secondary>( + opts: &rocksdb::Options, + primary_path: P, + secondary_path: P, + name: &'static str, + cfs: Vec, + ) -> anyhow::Result { + let inner = rocksdb::DB::open_cf_as_secondary(opts, primary_path, secondary_path, cfs)?; + Ok(Self::log_construct(name, inner)) + } + + fn log_construct(name: &'static str, inner: rocksdb::DB) -> DB { + info!(rocksdb_name = name, "Opened RocksDB."); + DB { name, inner } + } + + /// Reads single record by key. + pub fn get( + &self, + schema_key: &impl KeyCodec, + ) -> anyhow::Result> { + let _timer = ROCKBOUND_GET_LATENCY_SECONDS + .with_label_values(&[S::COLUMN_FAMILY_NAME]) + .start_timer(); + + let k = schema_key.encode_key()?; + let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?; + + let result = self.inner.get_pinned_cf(cf_handle, k)?; + ROCKBOUND_GET_BYTES + .with_label_values(&[S::COLUMN_FAMILY_NAME]) + .observe(result.as_ref().map_or(0.0, |v| v.len() as f64)); + + result + .map(|raw_value| >::decode_value(&raw_value)) + .transpose() + .map_err(|err| err.into()) + } + + /// Writes single record. + pub fn put( + &self, + key: &impl KeyCodec, + value: &impl ValueCodec, + ) -> anyhow::Result<()> { + // Not necessary to use a batch, but we'd like a central place to bump counters. + // Used in tests only anyway. + let mut batch = SchemaBatch::new(); + batch.put::(key, value)?; + self.write_schemas(batch) + } + + /// Delete a single key from the database. + pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { + // Not necessary to use a batch, but we'd like a central place to bump counters. + // Used in tests only anyway. + let mut batch = SchemaBatch::new(); + batch.delete::(key)?; + self.write_schemas(batch) + } + + /// Removes the database entries in the range `["from", "to")` using default write options. + /// + /// Note that this operation will be done lexicographic on the *encoding* of the seek keys. It is + /// up to the table creator to ensure that the lexicographic ordering of the encoded seek keys matches the + /// logical ordering of the type. + pub fn delete_range( + &self, + from: &impl SeekKeyEncoder, + to: &impl SeekKeyEncoder, + ) -> anyhow::Result<()> { + let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?; + let from = from.encode_seek_key()?; + let to = to.encode_seek_key()?; + self.inner.delete_range_cf(cf_handle, from, to)?; + Ok(()) + } + + fn iter_with_direction( + &self, + opts: ReadOptions, + direction: ScanDirection, + ) -> anyhow::Result> { + let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?; + Ok(SchemaIterator::new( + self.inner.raw_iterator_cf_opt(cf_handle, opts), + direction, + )) + } + + /// Returns a forward [`SchemaIterator`] on a certain schema with the default read options. + pub fn iter(&self) -> anyhow::Result> { + self.iter_with_direction::(Default::default(), ScanDirection::Forward) + } + + /// Returns a [`RawDbReverseIterator`] which allows to iterate over raw values, backwards + pub fn raw_iter(&self) -> anyhow::Result { + let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?; + Ok(RawDbReverseIterator::new( + self.inner + .raw_iterator_cf_opt(cf_handle, Default::default()), + )) + } + + /// Returns a forward [`SchemaIterator`] on a certain schema with the provided read options. + pub fn iter_with_opts( + &self, + opts: ReadOptions, + ) -> anyhow::Result> { + self.iter_with_direction::(opts, ScanDirection::Forward) + } + + /// Writes a group of records wrapped in a [`SchemaBatch`]. + pub fn write_schemas(&self, batch: SchemaBatch) -> anyhow::Result<()> { + let _timer = ROCKBOUND_BATCH_COMMIT_LATENCY_SECONDS + .with_label_values(&[self.name]) + .start_timer(); + let mut db_batch = rocksdb::WriteBatch::default(); + for (cf_name, rows) in batch.last_writes.iter() { + let cf_handle = self.get_cf_handle(cf_name)?; + for (key, operation) in rows { + match operation { + Operation::Put { value } => db_batch.put_cf(cf_handle, key, value), + Operation::Delete => db_batch.delete_cf(cf_handle, key), + } + } + } + let serialized_size = db_batch.size_in_bytes(); + + self.inner.write_opt(db_batch, &default_write_options())?; + + // Bump counters only after DB write succeeds. + for (cf_name, rows) in batch.last_writes.iter() { + for (key, operation) in rows { + match operation { + Operation::Put { value } => { + ROCKBOUND_PUT_BYTES + .with_label_values(&[cf_name]) + .observe((key.len() + value.len()) as f64); + } + Operation::Delete => { + ROCKBOUND_DELETES.with_label_values(&[cf_name]).inc(); + } + } + } + } + ROCKBOUND_BATCH_COMMIT_BYTES + .with_label_values(&[self.name]) + .observe(serialized_size as f64); + + Ok(()) + } + + fn get_cf_handle(&self, cf_name: &str) -> anyhow::Result<&rocksdb::ColumnFamily> { + self.inner.cf_handle(cf_name).ok_or_else(|| { + format_err!( + "DB::cf_handle not found for column family name: {}", + cf_name + ) + }) + } + + /// Flushes [MemTable](https://github.com/facebook/rocksdb/wiki/MemTable) data. + /// This is only used for testing `get_approximate_sizes_cf` in unit tests. + pub fn flush_cf(&self, cf_name: &str) -> anyhow::Result<()> { + Ok(self.inner.flush_cf(self.get_cf_handle(cf_name)?)?) + } + + /// Returns the current RocksDB property value for the provided column family name + /// and property name. + pub fn get_property(&self, cf_name: &str, property_name: &str) -> anyhow::Result { + self.inner + .property_int_value_cf(self.get_cf_handle(cf_name)?, property_name)? + .ok_or_else(|| { + format_err!( + "Unable to get property \"{}\" of column family \"{}\".", + property_name, + cf_name, + ) + }) + } + + /// Creates new physical DB checkpoint in directory specified by `path`. + pub fn create_checkpoint>(&self, path: P) -> anyhow::Result<()> { + rocksdb::checkpoint::Checkpoint::new(&self.inner)?.create_checkpoint(path)?; + Ok(()) + } +} + +/// Readability alias for a key in the DB. +pub type SchemaKey = Vec; +/// Readability alias for a value in the DB. +pub type SchemaValue = Vec; + +#[cfg_attr(feature = "arbitrary", derive(proptest_derive::Arbitrary))] +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +/// Represents operation written to the database +pub enum Operation { + /// Writing a value to the DB. + Put { + /// Value to write + value: SchemaValue, + }, + /// Deleting a value + Delete, +} + +/// An error that occurred during (de)serialization of a [`Schema`]'s keys or +/// values. +#[derive(Error, Debug)] +pub enum CodecError { + /// Unable to deserialize a key because it has a different length than + /// expected. + #[error("Invalid key length. Expected {expected:}, got {got:}")] + #[allow(missing_docs)] // The fields' names are self-explanatory. + InvalidKeyLength { expected: usize, got: usize }, + /// Some other error occurred when (de)serializing a key or value. Inspect + /// the inner [`anyhow::Error`] for more details. + #[error(transparent)] + Wrapped(#[from] anyhow::Error), + /// I/O error. + #[error(transparent)] + Io(#[from] std::io::Error), +} + +/// For now we always use synchronous writes. This makes sure that once the operation returns +/// `Ok(())` the data is persisted even if the machine crashes. In the future we might consider +/// selectively turning this off for some non-critical writes to improve performance. +fn default_write_options() -> rocksdb::WriteOptions { + let mut opts = rocksdb::WriteOptions::default(); + opts.set_sync(true); + opts +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_db_debug_output() { + let tmpdir = tempfile::tempdir().unwrap(); + let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME]; + + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + + let db = DB::open(tmpdir.path(), "test_db_debug", column_families, &db_opts) + .expect("Failed to open DB."); + + let db_debug = format!("{:?}", db); + assert!(db_debug.contains("test_db_debug")); + assert!(db_debug.contains(tmpdir.path().to_str().unwrap())); + } +} diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..a61de31 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,112 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +use once_cell::sync::Lazy; +use prometheus::{ + exponential_buckets, register_histogram_vec, register_int_counter_vec, HistogramVec, + IntCounterVec, +}; + +pub static ROCKBOUND_ITER_LATENCY_SECONDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "rockbound_iter_latency_seconds", + // metric description + "Schemadb iter latency in seconds", + // metric labels (dimensions) + &["cf_name"], + exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 22).unwrap(), + ) + .unwrap() +}); + +pub static ROCKBOUND_ITER_BYTES: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "rockbound_iter_bytes", + // metric description + "Schemadb iter size in bytes", + // metric labels (dimensions) + &["cf_name"] + ) + .unwrap() +}); + +pub static ROCKBOUND_GET_LATENCY_SECONDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "rockbound_get_latency_seconds", + // metric description + "Schemadb get latency in seconds", + // metric labels (dimensions) + &["cf_name"], + exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 22).unwrap(), + ) + .unwrap() +}); + +pub static ROCKBOUND_GET_BYTES: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "rockbound_get_bytes", + // metric description + "Schemadb get call returned data size in bytes", + // metric labels (dimensions) + &["cf_name"] + ) + .unwrap() +}); + +pub static ROCKBOUND_BATCH_COMMIT_LATENCY_SECONDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "rockbound_batch_commit_latency_seconds", + // metric description + "Schemadb schema batch commit latency in seconds", + // metric labels (dimensions) + &["db_name"], + exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(), + ) + .unwrap() +}); + +pub static ROCKBOUND_BATCH_COMMIT_BYTES: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "rockbound_batch_commit_bytes", + // metric description + "Schemadb schema batch commit size in bytes", + // metric labels (dimensions) + &["db_name"] + ) + .unwrap() +}); + +pub static ROCKBOUND_PUT_BYTES: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "sov_schema_db_put_bytes", + // metric description + "sov_schema_db put call puts data size in bytes", + // metric labels (dimensions) + &["cf_name"] + ) + .unwrap() +}); + +pub static ROCKBOUND_DELETES: Lazy = Lazy::new(|| { + register_int_counter_vec!("storage_deletes", "Storage delete calls", &["cf_name"]).unwrap() +}); + +pub static ROCKBOUND_BATCH_PUT_LATENCY_SECONDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "sov_schema_db_batch_put_latency_seconds", + // metric description + "sov_schema_db schema batch put latency in seconds", + // metric labels (dimensions) + &["db_name"], + exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(), + ) + .unwrap() +}); diff --git a/src/schema.rs b/src/schema.rs new file mode 100644 index 0000000..19b552a --- /dev/null +++ b/src/schema.rs @@ -0,0 +1,161 @@ +// Adapted from Aptos::storage::schemadb; +// While most of the Sovereign SDK will be available under both +// MIT and APACHE 2.0 licenses, this file is +// licensed under APACHE 2.0 only. + +//! A type-safe interface over [`DB`](crate::DB) column families. + +use std::fmt::Debug; + +use crate::CodecError; + +/// Crate users are expected to know [column +/// family](https://github.com/EighteenZi/rocksdb_wiki/blob/master/Column-Families.md) +/// names beforehand, so they can have `static` lifetimes. +pub type ColumnFamilyName = &'static str; + +/// A [`Schema`] is a type-safe interface over a specific column family in a +/// [`DB`](crate::DB). It always a key type ([`KeyCodec`]) and a value type ([`ValueCodec`]). +pub trait Schema: Debug + Send + Sync + 'static + Sized { + /// The column family name associated with this struct. + /// Note: all schemas within the same SchemaDB must have distinct column family names. + const COLUMN_FAMILY_NAME: ColumnFamilyName; + + /// Type of the key. + type Key: KeyCodec; + + /// Type of the value. + type Value: ValueCodec; +} + +/// A [`core::result::Result`] alias with [`CodecError`] as the error type. +pub type Result = core::result::Result; + +/// This trait defines a type that can serve as a [`Schema::Key`]. +/// +/// [`KeyCodec`] is a marker trait with a blanket implementation for all types +/// that are both [`KeyEncoder`] and [`KeyDecoder`]. Having [`KeyEncoder`] and +/// [`KeyDecoder`] as two standalone traits on top of [`KeyCodec`] may seem +/// superfluous, but it allows for zero-copy key encoding under specific +/// circumstances. E.g.: +/// +/// ```rust +/// use anyhow::Context; +/// +/// use rockbound::define_schema; +/// use rockbound::schema::{ +/// Schema, KeyEncoder, KeyDecoder, ValueCodec, Result, +/// }; +/// +/// define_schema!(PersonAgeByName, String, u32, "person_age_by_name"); +/// +/// impl KeyEncoder for String { +/// fn encode_key(&self) -> Result> { +/// Ok(self.as_bytes().to_vec()) +/// } +/// } +/// +/// /// What about encoding a `&str`, though? We'd have to copy it into a +/// /// `String` first, which is not ideal. But we can do better: +/// impl<'a> KeyEncoder for &'a str { +/// fn encode_key(&self) -> Result> { +/// Ok(self.as_bytes().to_vec()) +/// } +/// } +/// +/// impl KeyDecoder for String { +/// fn decode_key(data: &[u8]) -> Result { +/// Ok(String::from_utf8(data.to_vec()).context("Can't read key")?) +/// } +/// } +/// +/// impl ValueCodec for u32 { +/// fn encode_value(&self) -> Result> { +/// Ok(self.to_le_bytes().to_vec()) +/// } +/// +/// fn decode_value(data: &[u8]) -> Result { +/// let mut buf = [0u8; 4]; +/// buf.copy_from_slice(data); +/// Ok(u32::from_le_bytes(buf)) +/// } +/// } +/// ``` +pub trait KeyCodec: KeyEncoder + KeyDecoder {} + +impl KeyCodec for T where T: KeyEncoder + KeyDecoder {} + +/// Implementors of this trait can be used to encode keys in the given [`Schema`]. +pub trait KeyEncoder: Sized + PartialEq + Debug { + /// Converts `self` to bytes to be stored in RocksDB. + fn encode_key(&self) -> Result>; +} + +/// Implementors of this trait can be used to decode keys in the given [`Schema`]. +pub trait KeyDecoder: Sized + PartialEq + Debug { + /// Converts bytes fetched from RocksDB to `Self`. + fn decode_key(data: &[u8]) -> Result; +} + +/// This trait defines a type that can serve as a [`Schema::Value`]. +pub trait ValueCodec: Sized + PartialEq + Debug { + /// Converts `self` to bytes to be stored in DB. + fn encode_value(&self) -> Result>; + /// Converts bytes fetched from DB to `Self`. + fn decode_value(data: &[u8]) -> Result; +} + +/// A utility macro to define [`Schema`] implementors. You must specify the +/// [`Schema`] implementor's name, the key type, the value type, and the column +/// family name. +/// +/// # Example +/// +/// ```rust +/// use anyhow::Context; +/// +/// use rockbound::define_schema; +/// use rockbound::schema::{ +/// Schema, KeyEncoder, KeyDecoder, ValueCodec, Result, +/// }; +/// +/// define_schema!(PersonAgeByName, String, u32, "person_age_by_name"); +/// +/// impl KeyEncoder for String { +/// fn encode_key(&self) -> Result> { +/// Ok(self.as_bytes().to_vec()) +/// } +/// } +/// +/// impl KeyDecoder for String { +/// fn decode_key(data: &[u8]) -> Result { +/// Ok(String::from_utf8(data.to_vec()).context("Can't read key")?) +/// } +/// } +/// +/// impl ValueCodec for u32 { +/// fn encode_value(&self) -> Result> { +/// Ok(self.to_le_bytes().to_vec()) +/// } +/// +/// fn decode_value(data: &[u8]) -> Result { +/// let mut buf = [0u8; 4]; +/// buf.copy_from_slice(data); +/// Ok(u32::from_le_bytes(buf)) +/// } +/// } +/// ``` +#[macro_export] +macro_rules! define_schema { + ($schema_type:ident, $key_type:ty, $value_type:ty, $cf_name:expr) => { + #[derive(Debug)] + pub(crate) struct $schema_type; + + impl $crate::schema::Schema for $schema_type { + type Key = $key_type; + type Value = $value_type; + + const COLUMN_FAMILY_NAME: $crate::schema::ColumnFamilyName = $cf_name; + } + }; +} diff --git a/src/schema_batch.rs b/src/schema_batch.rs new file mode 100644 index 0000000..d4e82d0 --- /dev/null +++ b/src/schema_batch.rs @@ -0,0 +1,140 @@ +use std::collections::{btree_map, BTreeMap, HashMap}; +use std::iter::Rev; + +use crate::metrics::ROCKBOUND_BATCH_PUT_LATENCY_SECONDS; +use crate::schema::{ColumnFamilyName, KeyCodec, ValueCodec}; +use crate::{Operation, Schema, SchemaKey}; + +// [`SchemaBatch`] holds a collection of updates that can be applied to a DB +/// ([`Schema`]) atomically. The updates will be applied in the order in which +/// they are added to the [`SchemaBatch`]. +#[derive(Debug, Default)] +pub struct SchemaBatch { + // Temporary pub(crate), before iterator is done + pub(crate) last_writes: HashMap>, +} + +impl SchemaBatch { + /// Creates an empty batch. + pub fn new() -> Self { + Self::default() + } + + /// Adds an insert/update operation to the batch. + pub fn put( + &mut self, + key: &impl KeyCodec, + value: &impl ValueCodec, + ) -> anyhow::Result<()> { + let _timer = ROCKBOUND_BATCH_PUT_LATENCY_SECONDS + .with_label_values(&["unknown"]) + .start_timer(); + let key = key.encode_key()?; + let put_operation = Operation::Put { + value: value.encode_value()?, + }; + self.insert_operation::(key, put_operation); + Ok(()) + } + + /// Adds a delete operation to the batch. + pub fn delete(&mut self, key: &impl KeyCodec) -> anyhow::Result<()> { + let key = key.encode_key()?; + self.insert_operation::(key, Operation::Delete); + + Ok(()) + } + + fn insert_operation(&mut self, key: SchemaKey, operation: Operation) { + let column_writes = self.last_writes.entry(S::COLUMN_FAMILY_NAME).or_default(); + column_writes.insert(key, operation); + } + + pub(crate) fn read( + &self, + key: &impl KeyCodec, + ) -> anyhow::Result> { + let key = key.encode_key()?; + if let Some(column_writes) = self.last_writes.get(&S::COLUMN_FAMILY_NAME) { + return Ok(column_writes.get(&key)); + } + Ok(None) + } + + /// Iterate over all the writes in the batch for a given column family in reversed lexicographic order + /// Returns None column family name does not have any writes + pub fn iter( + &self, + ) -> SchemaBatchIterator<'_, S, Rev>> { + let some_rows = self.last_writes.get(&S::COLUMN_FAMILY_NAME); + SchemaBatchIterator { + inner: some_rows.map(|rows| rows.iter().rev()), + _phantom_schema: std::marker::PhantomData, + } + } + + /// Return iterator that iterates from operations with largest_key == upper_bound backwards + pub fn iter_range( + &self, + upper_bound: SchemaKey, + ) -> SchemaBatchIterator<'_, S, Rev>> { + let some_rows = self.last_writes.get(&S::COLUMN_FAMILY_NAME); + SchemaBatchIterator { + inner: some_rows.map(|rows| rows.range(..=upper_bound).rev()), + _phantom_schema: std::marker::PhantomData, + } + } + + pub(crate) fn merge(&mut self, other: SchemaBatch) { + for (cf_name, other_cf_map) in other.last_writes { + let self_cf_map = self.last_writes.entry(cf_name).or_default(); + + for (key, operation) in other_cf_map { + self_cf_map.insert(key, operation); + } + } + } +} + +/// Iterator over [`SchemaBatch`] for a given column family in reversed lexicographic order +pub struct SchemaBatchIterator<'a, S, I> +where + S: Schema, + I: Iterator, +{ + inner: Option, + _phantom_schema: std::marker::PhantomData, +} + +impl<'a, S, I> Iterator for SchemaBatchIterator<'a, S, I> +where + S: Schema, + I: Iterator, +{ + type Item = I::Item; + + fn next(&mut self) -> Option { + self.inner.as_mut().and_then(|inner| inner.next()) + } +} + +#[cfg(feature = "arbitrary")] +impl proptest::arbitrary::Arbitrary for SchemaBatch { + type Parameters = &'static [ColumnFamilyName]; + fn arbitrary_with(columns: Self::Parameters) -> Self::Strategy { + use proptest::prelude::any; + use proptest::strategy::Strategy; + + proptest::collection::vec(any::>(), columns.len()) + .prop_map::(|vec_vec_write_ops| { + let mut rows = HashMap::new(); + for (col, write_op) in columns.iter().zip(vec_vec_write_ops.into_iter()) { + rows.insert(*col, write_op); + } + SchemaBatch { last_writes: rows } + }) + .boxed() + } + + type Strategy = proptest::strategy::BoxedStrategy; +} diff --git a/src/snapshot.rs b/src/snapshot.rs new file mode 100644 index 0000000..ee4292c --- /dev/null +++ b/src/snapshot.rs @@ -0,0 +1,508 @@ +//! Snapshot related logic + +use std::collections::btree_map; +use std::iter::Rev; +use std::sync::{Arc, LockResult, Mutex, RwLock, RwLockReadGuard}; + +use crate::schema::{KeyCodec, KeyDecoder, ValueCodec}; +use crate::schema_batch::SchemaBatchIterator; +use crate::{Operation, Schema, SchemaBatch, SchemaKey, SchemaValue, SeekKeyEncoder}; + +/// Id of database snapshot +pub type SnapshotId = u64; + +/// A trait to make nested calls to several [`SchemaBatch`]s and eventually [`crate::DB`] +pub trait QueryManager { + /// Iterator over key-value pairs in reverse lexicographic order in given [`Schema`] + type Iter<'a, S: Schema>: Iterator + where + Self: 'a; + /// Iterator with given range + type RangeIter<'a, S: Schema>: Iterator + where + Self: 'a; + /// Get a value from parents of given [`SnapshotId`] + /// In case of unknown [`SnapshotId`] return `Ok(None)` + fn get( + &self, + snapshot_id: SnapshotId, + key: &impl KeyCodec, + ) -> anyhow::Result>; + + /// Returns an iterator over all key-value pairs in given [`Schema`] in reverse lexicographic order + /// Starting from given [`SnapshotId`] + fn iter(&self, snapshot_id: SnapshotId) -> anyhow::Result>; + /// Returns an iterator over all key-value pairs in given [`Schema`] in reverse lexicographic order + /// Starting from given [`SnapshotId`], where largest returned key will be less or equal to `upper_bound` + fn iter_range( + &self, + snapshot_id: SnapshotId, + upper_bound: SchemaKey, + ) -> anyhow::Result>; +} + +/// Simple wrapper around `RwLock` that only allows read access. +#[derive(Debug)] +pub struct ReadOnlyLock { + lock: Arc>, +} + +impl ReadOnlyLock { + /// Create new [`ReadOnlyLock`] from [`Arc>`]. + pub fn new(lock: Arc>) -> Self { + Self { lock } + } + + /// Acquires a read lock on the underlying `RwLock`. + pub fn read(&self) -> LockResult> { + self.lock.read() + } +} + +impl From>> for ReadOnlyLock { + fn from(value: Arc>) -> Self { + Self::new(value) + } +} + +/// Wrapper around [`QueryManager`] that allows to read from snapshots +#[derive(Debug)] +pub struct DbSnapshot { + id: SnapshotId, + cache: Mutex, + parents_manager: ReadOnlyLock, +} + +impl DbSnapshot { + /// Create new [`DbSnapshot`] + pub fn new(id: SnapshotId, manager: ReadOnlyLock) -> Self { + Self { + id, + cache: Mutex::new(SchemaBatch::default()), + parents_manager: manager, + } + } + + /// Store a value in snapshot + pub fn put( + &self, + key: &impl KeyCodec, + value: &impl ValueCodec, + ) -> anyhow::Result<()> { + self.cache + .lock() + .expect("Local SchemaBatch lock must not be poisoned") + .put(key, value) + } + + /// Delete given key from snapshot + pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { + self.cache + .lock() + .expect("Local SchemaBatch lock must not be poisoned") + .delete(key) + } + + /// Writes many operations at once, atomically + pub fn write_many(&self, batch: SchemaBatch) -> anyhow::Result<()> { + let mut cache = self + .cache + .lock() + .expect("Local SchemaBatch lock must not be poisoned"); + cache.merge(batch); + Ok(()) + } +} + +impl DbSnapshot { + /// Get a value from current snapshot, its parents or underlying database + pub fn read(&self, key: &impl KeyCodec) -> anyhow::Result> { + // Some(Operation) means that key was touched, + // but in case of deletion we early return None + // Only in case of not finding operation for key, + // we go deeper + + // Hold local cache lock explicitly, so reads are atomic + let local_cache = self + .cache + .lock() + .expect("SchemaBatch lock should not be poisoned"); + + // 1. Check in cache + if let Some(operation) = local_cache.read(key)? { + return decode_operation::(operation); + } + + // 2. Check parent + let parent = self + .parents_manager + .read() + .expect("Parent lock must not be poisoned"); + parent.get::(self.id, key) + } + + /// Get value of largest key written value for given [`Schema`] + pub fn get_largest(&self) -> anyhow::Result> { + let local_cache = self + .cache + .lock() + .expect("SchemaBatch lock must not be poisoned"); + let local_cache_iter = local_cache.iter::(); + + let parent = self + .parents_manager + .read() + .expect("Parent lock must not be poisoned"); + + let parent_iter = parent.iter::(self.id)?; + + let mut combined_iter: SnapshotIter<'_, S, _, _> = SnapshotIter { + local_cache_iter: local_cache_iter.peekable(), + parent_iter: parent_iter.peekable(), + }; + + if let Some((key, value)) = combined_iter.next() { + let key = S::Key::decode_key(&key)?; + let value = S::Value::decode_value(&value)?; + return Ok(Some((key, value))); + } + + Ok(None) + } + + /// Get largest value in [`Schema`] that is smaller or equal than give `seek_key` + pub fn get_prev( + &self, + seek_key: &impl SeekKeyEncoder, + ) -> anyhow::Result> { + let seek_key = seek_key.encode_seek_key()?; + let local_cache = self + .cache + .lock() + .expect("Local cache lock must not be poisoned"); + let local_cache_iter = local_cache.iter_range::(seek_key.clone()); + + let parent = self + .parents_manager + .read() + .expect("Parent snapshots lock must not be poisoned"); + let parent_iter = parent.iter_range::(self.id, seek_key.clone())?; + + let mut combined_iter: SnapshotIter<'_, S, _, _> = SnapshotIter { + local_cache_iter: local_cache_iter.peekable(), + parent_iter: parent_iter.peekable(), + }; + + if let Some((key, value)) = combined_iter.next() { + let key = S::Key::decode_key(&key)?; + let value = S::Value::decode_value(&value)?; + return Ok(Some((key, value))); + } + Ok(None) + } +} + +struct SnapshotIter<'a, S, LocalIter, ParentIter> +where + S: Schema, + LocalIter: Iterator, + ParentIter: Iterator, +{ + local_cache_iter: std::iter::Peekable>, + parent_iter: std::iter::Peekable, +} + +impl<'a, S, LocalIter, ParentIter> Iterator for SnapshotIter<'a, S, LocalIter, ParentIter> +where + S: Schema, + LocalIter: Iterator, + ParentIter: Iterator, +{ + type Item = (SchemaKey, SchemaValue); + + fn next(&mut self) -> Option { + loop { + let local_cache_peeked = self.local_cache_iter.peek(); + let parent_peeked = self.parent_iter.peek(); + + match (local_cache_peeked, parent_peeked) { + // Both iterators exhausted + (None, None) => break, + // Parent exhausted (just like me on friday) + (Some(&(key, operation)), None) => { + self.local_cache_iter.next(); + let next = put_or_none(key, operation); + if next.is_none() { + continue; + } + return next; + } + // Local exhausted + (None, Some((_key, _value))) => { + return self.parent_iter.next(); + } + // Both are active, need to compare keys + (Some(&(local_key, local_operation)), Some((parent_key, _parent_value))) => { + return if local_key < parent_key { + self.parent_iter.next() + } else { + // Local is preferable, as it is the latest + // But both operators must succeed + if local_key == parent_key { + self.parent_iter.next(); + } + self.local_cache_iter.next(); + let next = put_or_none(local_key, local_operation); + if next.is_none() { + continue; + } + next + }; + } + } + } + + None + } +} + +/// Read only version of [`DbSnapshot`], for usage inside [`QueryManager`] +pub struct ReadOnlyDbSnapshot { + id: SnapshotId, + cache: SchemaBatch, +} + +impl ReadOnlyDbSnapshot { + /// Get value from its own cache + pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { + self.cache.read(key) + } + + /// Get id of this Snapshot + pub fn get_id(&self) -> SnapshotId { + self.id + } + + /// Iterate over all operations in snapshot in reversed lexicographic order + pub fn iter( + &self, + ) -> SchemaBatchIterator<'_, S, Rev>> { + self.cache.iter::() + } + + /// Iterate over all operations in snapshot in reversed lexicographical order, starting from `upper_bound` + pub fn iter_range( + &self, + upper_bound: SchemaKey, + ) -> SchemaBatchIterator<'_, S, Rev>> { + self.cache.iter_range::(upper_bound) + } +} + +impl From> for ReadOnlyDbSnapshot { + fn from(snapshot: DbSnapshot) -> Self { + Self { + id: snapshot.id, + cache: snapshot + .cache + .into_inner() + .expect("SchemaBatch lock must not be poisoned"), + } + } +} + +impl From for SchemaBatch { + fn from(value: ReadOnlyDbSnapshot) -> Self { + value.cache + } +} + +fn decode_operation(operation: &Operation) -> anyhow::Result> { + match operation { + Operation::Put { value } => { + let value = S::Value::decode_value(value)?; + Ok(Some(value)) + } + Operation::Delete => Ok(None), + } +} + +fn put_or_none(key: &SchemaKey, operation: &Operation) -> Option<(SchemaKey, SchemaValue)> { + if let Operation::Put { value } = operation { + return Some((key.to_vec(), value.to_vec())); + } + None +} + +/// QueryManager, which never returns any values +#[derive(Clone, Debug, Default)] +pub struct NoopQueryManager; + +impl QueryManager for NoopQueryManager { + type Iter<'a, S: Schema> = std::iter::Empty<(SchemaKey, SchemaValue)>; + type RangeIter<'a, S: Schema> = std::iter::Empty<(SchemaKey, SchemaValue)>; + + fn get( + &self, + _snapshot_id: SnapshotId, + _key: &impl KeyCodec, + ) -> anyhow::Result> { + Ok(None) + } + + fn iter(&self, _snapshot_id: SnapshotId) -> anyhow::Result> { + Ok(std::iter::empty()) + } + + fn iter_range( + &self, + _snapshot_id: SnapshotId, + _upper_bound: SchemaKey, + ) -> anyhow::Result> { + Ok(std::iter::empty()) + } +} + +/// Snapshot manager, where all snapshots are collapsed into 1 +#[derive(Default)] +pub struct SingleSnapshotQueryManager { + cache: SchemaBatch, +} + +impl SingleSnapshotQueryManager { + /// Adding new snapshot. It will override any existing data on key match + pub fn add_snapshot(&mut self, snapshot: ReadOnlyDbSnapshot) { + let ReadOnlyDbSnapshot { + cache: new_data, .. + } = snapshot; + + self.cache.merge(new_data); + } +} + +impl QueryManager for SingleSnapshotQueryManager { + type Iter<'a, S: Schema> = std::vec::IntoIter<(SchemaKey, SchemaValue)>; + type RangeIter<'a, S: Schema> = std::vec::IntoIter<(SchemaKey, SchemaValue)>; + + fn get( + &self, + _snapshot_id: SnapshotId, + key: &impl KeyCodec, + ) -> anyhow::Result> { + if let Some(Operation::Put { value }) = self.cache.read(key)? { + let value = S::Value::decode_value(value)?; + return Ok(Some(value)); + } + Ok(None) + } + + fn iter(&self, _snapshot_id: SnapshotId) -> anyhow::Result> { + let collected: Vec<(SchemaKey, SchemaValue)> = self + .cache + .iter::() + .filter_map(|(k, op)| match op { + Operation::Put { value } => Some((k.to_vec(), value.to_vec())), + Operation::Delete => None, + }) + .collect(); + + Ok(collected.into_iter()) + } + + fn iter_range( + &self, + _snapshot_id: SnapshotId, + upper_bound: SchemaKey, + ) -> anyhow::Result> { + let collected: Vec<(SchemaKey, SchemaValue)> = self + .cache + .iter_range::(upper_bound) + .filter_map(|(k, op)| match op { + Operation::Put { value } => Some((k.to_vec(), value.to_vec())), + Operation::Delete => None, + }) + .collect(); + + Ok(collected.into_iter()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::define_schema; + use crate::schema::KeyEncoder; + use crate::test::{TestCompositeField, TestField}; + + define_schema!(TestSchema, TestCompositeField, TestField, "TestCF"); + + fn encode_key(key: &TestCompositeField) -> Vec { + >::encode_key(key).unwrap() + } + + fn encode_value(value: &TestField) -> Vec { + >::encode_value(value).unwrap() + } + + #[test] + fn test_db_snapshot_iterator_empty() { + let local_cache = SchemaBatch::new(); + let parent_values = SchemaBatch::new(); + + let manager = SingleSnapshotQueryManager { + cache: parent_values, + }; + + let local_cache_iter = local_cache.iter::().peekable(); + let manager_iter = manager.iter::(0).unwrap().peekable(); + + let snapshot_iter = SnapshotIter::<'_, TestSchema, _, _> { + local_cache_iter, + parent_iter: manager_iter, + }; + + let values: Vec<(SchemaKey, SchemaValue)> = snapshot_iter.collect(); + + assert!(values.is_empty()); + } + + #[test] + fn test_db_snapshot_iterator_values() { + let k1 = TestCompositeField(0, 1, 0); + let k2 = TestCompositeField(0, 1, 2); + let k3 = TestCompositeField(3, 1, 0); + let k4 = TestCompositeField(3, 2, 0); + + let mut parent_values = SchemaBatch::new(); + parent_values.put::(&k2, &TestField(2)).unwrap(); + parent_values.put::(&k1, &TestField(1)).unwrap(); + parent_values.put::(&k4, &TestField(4)).unwrap(); + parent_values.put::(&k3, &TestField(3)).unwrap(); + + let mut local_cache = SchemaBatch::new(); + local_cache.delete::(&k3).unwrap(); + local_cache.put::(&k1, &TestField(10)).unwrap(); + local_cache.put::(&k2, &TestField(20)).unwrap(); + + let manager = SingleSnapshotQueryManager { + cache: parent_values, + }; + + let local_cache_iter = local_cache.iter::().peekable(); + let manager_iter = manager.iter::(0).unwrap().peekable(); + + let snapshot_iter = SnapshotIter::<'_, TestSchema, _, _> { + local_cache_iter, + parent_iter: manager_iter, + }; + + let actual_values: Vec<(SchemaKey, SchemaValue)> = snapshot_iter.collect(); + let expected_values = vec![ + (encode_key(&k4), encode_value(&TestField(4))), + (encode_key(&k2), encode_value(&TestField(20))), + (encode_key(&k1), encode_value(&TestField(10))), + ]; + + assert_eq!(expected_values, actual_values); + } +} diff --git a/src/test.rs b/src/test.rs new file mode 100644 index 0000000..fc88ee5 --- /dev/null +++ b/src/test.rs @@ -0,0 +1,116 @@ +//! Helpers structures for testing, such as fields. + +use anyhow::Result; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; + +use crate::schema::{KeyDecoder, KeyEncoder, ValueCodec}; +use crate::{CodecError, Schema, SeekKeyEncoder}; + +#[derive(Debug, Eq, PartialEq, Clone)] +/// Key that composed out of tuple of r u32 +pub struct TestCompositeField(pub u32, pub u32, pub u32); + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +/// Simple value around u32 +pub struct TestField(pub u32); + +impl KeyEncoder for TestCompositeField { + fn encode_key(&self) -> Result, CodecError> { + let mut bytes = vec![]; + bytes + .write_u32::(self.0) + .map_err(|e| CodecError::Wrapped(e.into()))?; + bytes + .write_u32::(self.1) + .map_err(|e| CodecError::Wrapped(e.into()))?; + bytes + .write_u32::(self.2) + .map_err(|e| CodecError::Wrapped(e.into()))?; + Ok(bytes) + } +} + +impl KeyDecoder for TestCompositeField { + fn decode_key(data: &[u8]) -> Result { + let mut reader = std::io::Cursor::new(data); + Ok(TestCompositeField( + reader + .read_u32::() + .map_err(|e| CodecError::Wrapped(e.into()))?, + reader + .read_u32::() + .map_err(|e| CodecError::Wrapped(e.into()))?, + reader + .read_u32::() + .map_err(|e| CodecError::Wrapped(e.into()))?, + )) + } +} + +impl SeekKeyEncoder for TestCompositeField { + fn encode_seek_key(&self) -> crate::schema::Result> { + >::encode_key(self) + } +} + +impl TestField { + fn as_bytes(&self) -> Vec { + self.0.to_be_bytes().to_vec() + } + + fn from_bytes(data: &[u8]) -> std::result::Result { + let mut reader = std::io::Cursor::new(data); + Ok(TestField( + reader + .read_u32::() + .map_err(|e| CodecError::Wrapped(e.into()))?, + )) + } +} + +impl ValueCodec for TestField { + fn encode_value(&self) -> Result, CodecError> { + Ok(self.as_bytes()) + } + + fn decode_value(data: &[u8]) -> Result { + Self::from_bytes(data) + } +} + +impl KeyDecoder for TestField { + fn decode_key(data: &[u8]) -> std::result::Result { + Self::from_bytes(data) + } +} + +impl KeyEncoder for TestField { + fn encode_key(&self) -> std::result::Result, CodecError> { + Ok(self.as_bytes()) + } +} + +/// KeyPrefix over single u32 +pub struct KeyPrefix1(pub u32); + +impl SeekKeyEncoder for KeyPrefix1 { + fn encode_seek_key(&self) -> Result, CodecError> { + Ok(self.0.to_be_bytes().to_vec()) + } +} + +/// KeyPrefix over pair of u32 +pub struct KeyPrefix2(pub u32, pub u32); + +impl SeekKeyEncoder for KeyPrefix2 { + fn encode_seek_key(&self) -> Result, CodecError> { + let mut bytes = vec![]; + bytes + .write_u32::(self.0) + .map_err(|e| CodecError::Wrapped(e.into()))?; + bytes + .write_u32::(self.1) + .map_err(|e| CodecError::Wrapped(e.into()))?; + Ok(bytes) + } +} diff --git a/tests/db_test.rs b/tests/db_test.rs new file mode 100644 index 0000000..cfc728c --- /dev/null +++ b/tests/db_test.rs @@ -0,0 +1,347 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +use std::path::Path; + +use rockbound::schema::{ColumnFamilyName, Result}; +use rockbound::test::TestField; +use rockbound::{define_schema, Schema, SchemaBatch, DB}; +use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; +use tempfile::TempDir; + +// Creating two schemas that share exactly the same structure but are stored in different column +// families. Also note that the key and value are of the same type `TestField`. By implementing +// both the `KeyCodec<>` and `ValueCodec<>` traits for both schemas, we are able to use it +// everywhere. +define_schema!(TestSchema1, TestField, TestField, "TestCF1"); +define_schema!(TestSchema2, TestField, TestField, "TestCF2"); + +fn get_column_families() -> Vec { + vec![ + DEFAULT_COLUMN_FAMILY_NAME, + TestSchema1::COLUMN_FAMILY_NAME, + TestSchema2::COLUMN_FAMILY_NAME, + ] +} + +fn open_db(dir: impl AsRef) -> DB { + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + DB::open(dir, "test", get_column_families(), &db_opts).expect("Failed to open DB.") +} + +fn open_db_read_only(dir: &TempDir) -> DB { + DB::open_cf_readonly( + &rocksdb::Options::default(), + dir.path(), + "test", + get_column_families(), + ) + .expect("Failed to open DB.") +} + +fn open_db_as_secondary(dir: &TempDir, dir_sec: &TempDir) -> DB { + DB::open_cf_as_secondary( + &rocksdb::Options::default(), + &dir.path(), + &dir_sec.path(), + "test", + get_column_families(), + ) + .expect("Failed to open DB.") +} + +struct TestDB { + _tmpdir: TempDir, + db: DB, +} + +impl TestDB { + fn new() -> Self { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(&tmpdir); + + TestDB { + _tmpdir: tmpdir, + db, + } + } +} + +impl std::ops::Deref for TestDB { + type Target = DB; + + fn deref(&self) -> &Self::Target { + &self.db + } +} + +#[test] +fn test_schema_put_get() { + let db = TestDB::new(); + + // Let's put more than 256 items in each to test RocksDB's lexicographic + // ordering. + for i in 0..300 { + db.put::(&TestField(i), &TestField(i)).unwrap(); + } + for i in 100..400 { + db.put::(&TestField(i), &TestField(i + 1)) + .unwrap(); + } + + // `.get()`. + assert_eq!( + db.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); + assert_eq!( + db.get::(&TestField(1)).unwrap(), + Some(TestField(1)), + ); + assert_eq!( + db.get::(&TestField(299)).unwrap(), + Some(TestField(299)), + ); + assert_eq!( + db.get::(&TestField(102)).unwrap(), + Some(TestField(103)), + ); + assert_eq!( + db.get::(&TestField(203)).unwrap(), + Some(TestField(204)), + ); + assert_eq!( + db.get::(&TestField(399)).unwrap(), + Some(TestField(400)), + ); + + // `collect_values()`. + assert_eq!( + collect_values::(&db), + gen_expected_values(&(100..400).map(|i| (i, i + 1)).collect::>()), + ); + + // Nonexistent keys. + assert_eq!(db.get::(&TestField(300)).unwrap(), None); + assert_eq!(db.get::(&TestField(99)).unwrap(), None); + assert_eq!(db.get::(&TestField(400)).unwrap(), None); +} + +fn collect_values(db: &TestDB) -> Vec<(S::Key, S::Value)> { + let mut iter = db.iter::().expect("Failed to create iterator."); + iter.seek_to_first(); + iter.map(|res| res.map(|item| item.into_tuple())) + .collect::, anyhow::Error>>() + .unwrap() +} + +fn gen_expected_values(values: &[(u32, u32)]) -> Vec<(TestField, TestField)> { + values + .iter() + .cloned() + .map(|(x, y)| (TestField(x), TestField(y))) + .collect() +} + +#[test] +fn test_single_schema_batch() { + let db = TestDB::new(); + + let mut db_batch = SchemaBatch::new(); + db_batch + .put::(&TestField(0), &TestField(0)) + .unwrap(); + db_batch + .put::(&TestField(1), &TestField(1)) + .unwrap(); + db_batch + .put::(&TestField(2), &TestField(2)) + .unwrap(); + db_batch + .put::(&TestField(3), &TestField(3)) + .unwrap(); + db_batch.delete::(&TestField(4)).unwrap(); + db_batch.delete::(&TestField(3)).unwrap(); + db_batch + .put::(&TestField(4), &TestField(4)) + .unwrap(); + db_batch + .put::(&TestField(5), &TestField(5)) + .unwrap(); + + db.write_schemas(db_batch).unwrap(); + + assert_eq!( + collect_values::(&db), + gen_expected_values(&[(0, 0), (1, 1), (2, 2)]), + ); + assert_eq!( + collect_values::(&db), + gen_expected_values(&[(4, 4), (5, 5)]), + ); +} + +#[test] +fn test_two_schema_batches() { + let db = TestDB::new(); + + let mut db_batch1 = SchemaBatch::new(); + db_batch1 + .put::(&TestField(0), &TestField(0)) + .unwrap(); + db_batch1 + .put::(&TestField(1), &TestField(1)) + .unwrap(); + db_batch1 + .put::(&TestField(2), &TestField(2)) + .unwrap(); + db_batch1.delete::(&TestField(2)).unwrap(); + db.write_schemas(db_batch1).unwrap(); + + assert_eq!( + collect_values::(&db), + gen_expected_values(&[(0, 0), (1, 1)]), + ); + + let mut db_batch2 = SchemaBatch::new(); + db_batch2.delete::(&TestField(3)).unwrap(); + db_batch2 + .put::(&TestField(3), &TestField(3)) + .unwrap(); + db_batch2 + .put::(&TestField(4), &TestField(4)) + .unwrap(); + db_batch2 + .put::(&TestField(5), &TestField(5)) + .unwrap(); + db.write_schemas(db_batch2).unwrap(); + + assert_eq!( + collect_values::(&db), + gen_expected_values(&[(0, 0), (1, 1)]), + ); + assert_eq!( + collect_values::(&db), + gen_expected_values(&[(3, 3), (4, 4), (5, 5)]), + ); +} + +#[test] +fn test_reopen() { + let tmpdir = tempfile::tempdir().unwrap(); + { + let db = open_db(&tmpdir); + db.put::(&TestField(0), &TestField(0)).unwrap(); + assert_eq!( + db.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); + } + { + let db = open_db(&tmpdir); + assert_eq!( + db.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); + } +} + +#[test] +fn test_open_read_only() { + let tmpdir = tempfile::tempdir().unwrap(); + { + let db = open_db(&tmpdir); + db.put::(&TestField(0), &TestField(0)).unwrap(); + } + { + let db = open_db_read_only(&tmpdir); + assert_eq!( + db.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); + assert!(db.put::(&TestField(1), &TestField(1)).is_err()); + } +} + +#[test] +fn test_open_as_secondary() { + let tmpdir = tempfile::tempdir().unwrap(); + let tmpdir_sec = tempfile::tempdir().unwrap(); + + let db = open_db(&tmpdir); + db.put::(&TestField(0), &TestField(0)).unwrap(); + + let db_sec = open_db_as_secondary(&tmpdir, &tmpdir_sec); + assert_eq!( + db_sec.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); +} + +#[test] +fn test_report_size() { + let db = TestDB::new(); + + for i in 0..1000 { + let mut db_batch = SchemaBatch::new(); + db_batch + .put::(&TestField(i), &TestField(i)) + .unwrap(); + db_batch + .put::(&TestField(i), &TestField(i)) + .unwrap(); + db.write_schemas(db_batch).unwrap(); + } + + db.flush_cf("TestCF1").unwrap(); + db.flush_cf("TestCF2").unwrap(); + + assert!( + db.get_property("TestCF1", "rocksdb.estimate-live-data-size") + .unwrap() + > 0 + ); + assert!( + db.get_property("TestCF2", "rocksdb.estimate-live-data-size") + .unwrap() + > 0 + ); + assert_eq!( + db.get_property("default", "rocksdb.estimate-live-data-size") + .unwrap(), + 0 + ); +} + +#[test] +fn test_checkpoint() { + let tmpdir = tempfile::tempdir().unwrap(); + let checkpoint_parent = tempfile::tempdir().unwrap(); + let checkpoint = checkpoint_parent.path().join("checkpoint"); + { + let db = open_db(&tmpdir); + db.put::(&TestField(0), &TestField(0)).unwrap(); + db.create_checkpoint(&checkpoint).unwrap(); + } + { + let db = open_db(&tmpdir); + assert_eq!( + db.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); + + let cp = open_db(&checkpoint); + assert_eq!( + cp.get::(&TestField(0)).unwrap(), + Some(TestField(0)), + ); + cp.put::(&TestField(1), &TestField(1)).unwrap(); + assert_eq!( + cp.get::(&TestField(1)).unwrap(), + Some(TestField(1)), + ); + assert_eq!(db.get::(&TestField(1)).unwrap(), None); + } +} diff --git a/tests/iterator_test.rs b/tests/iterator_test.rs new file mode 100644 index 0000000..0dbfb5b --- /dev/null +++ b/tests/iterator_test.rs @@ -0,0 +1,528 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::{Arc, RwLock}; + +use rockbound::schema::{KeyDecoder, KeyEncoder, ValueCodec}; +use rockbound::snapshot::{DbSnapshot, ReadOnlyLock, SingleSnapshotQueryManager}; +use rockbound::test::{KeyPrefix1, KeyPrefix2, TestCompositeField, TestField}; +use rockbound::{ + define_schema, Operation, Schema, SchemaBatch, SchemaIterator, SeekKeyEncoder, DB, +}; +use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; +use tempfile::TempDir; + +define_schema!(TestSchema, TestCompositeField, TestField, "TestCF"); + +type S = TestSchema; + +fn collect_values(iter: SchemaIterator) -> Vec { + iter.map(|row| row.unwrap().value.0).collect() +} + +fn decode_key(key: &[u8]) -> TestCompositeField { + >::decode_key(key).unwrap() +} + +fn encode_key(key: &TestCompositeField) -> Vec { + >::encode_key(key).unwrap() +} + +fn encode_value(value: &TestField) -> Vec { + >::encode_value(value).unwrap() +} + +struct TestDB { + _tmpdir: TempDir, + db: DB, +} + +impl TestDB { + fn new() -> Self { + let tmpdir = tempfile::tempdir().unwrap(); + let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME, S::COLUMN_FAMILY_NAME]; + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + let db = DB::open(tmpdir.path(), "test", column_families, &db_opts).unwrap(); + + db.put::(&TestCompositeField(1, 0, 0), &TestField(100)) + .unwrap(); + db.put::(&TestCompositeField(1, 0, 2), &TestField(102)) + .unwrap(); + db.put::(&TestCompositeField(1, 0, 4), &TestField(104)) + .unwrap(); + db.put::(&TestCompositeField(1, 1, 0), &TestField(110)) + .unwrap(); + db.put::(&TestCompositeField(1, 1, 2), &TestField(112)) + .unwrap(); + db.put::(&TestCompositeField(1, 1, 4), &TestField(114)) + .unwrap(); + db.put::(&TestCompositeField(2, 0, 0), &TestField(200)) + .unwrap(); + db.put::(&TestCompositeField(2, 0, 2), &TestField(202)) + .unwrap(); + + TestDB { + _tmpdir: tmpdir, + db, + } + } +} + +impl TestDB { + fn iter(&self) -> SchemaIterator { + self.db.iter().expect("Failed to create iterator.") + } + + fn rev_iter(&self) -> SchemaIterator { + self.db.iter().expect("Failed to create iterator.").rev() + } +} + +impl std::ops::Deref for TestDB { + type Target = DB; + + fn deref(&self) -> &Self::Target { + &self.db + } +} + +#[test] +fn test_seek_to_first() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_to_first(); + assert_eq!( + collect_values(iter), + [100, 102, 104, 110, 112, 114, 200, 202] + ); + + let mut iter = db.rev_iter(); + iter.seek_to_first(); + assert_eq!(collect_values(iter), [100]); +} + +#[test] +fn test_seek_to_last() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_to_last(); + assert_eq!(collect_values(iter), [202]); + + let mut iter = db.rev_iter(); + iter.seek_to_last(); + assert_eq!( + collect_values(iter), + [202, 200, 114, 112, 110, 104, 102, 100] + ); +} + +#[test] +fn test_seek_by_existing_key() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek(&TestCompositeField(1, 1, 0)).unwrap(); + assert_eq!(collect_values(iter), [110, 112, 114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek(&TestCompositeField(1, 1, 0)).unwrap(); + assert_eq!(collect_values(iter), [110, 104, 102, 100]); +} + +#[test] +fn test_seek_by_nonexistent_key() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek(&TestCompositeField(1, 1, 1)).unwrap(); + assert_eq!(collect_values(iter), [112, 114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek(&TestCompositeField(1, 1, 1)).unwrap(); + assert_eq!(collect_values(iter), [112, 110, 104, 102, 100]); +} + +#[test] +fn test_seek_for_prev_by_existing_key() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_for_prev(&TestCompositeField(1, 1, 0)).unwrap(); + assert_eq!(collect_values(iter), [110, 112, 114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek_for_prev(&TestCompositeField(1, 1, 0)).unwrap(); + assert_eq!(collect_values(iter), [110, 104, 102, 100]); +} + +#[test] +fn test_seek_for_prev_by_nonexistent_key() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_for_prev(&TestCompositeField(1, 1, 1)).unwrap(); + assert_eq!(collect_values(iter), [110, 112, 114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek_for_prev(&TestCompositeField(1, 1, 1)).unwrap(); + assert_eq!(collect_values(iter), [110, 104, 102, 100]); +} + +#[test] +fn test_seek_by_1prefix() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek(&KeyPrefix1(2)).unwrap(); + assert_eq!(collect_values(iter), [200, 202]); + + let mut iter = db.rev_iter(); + iter.seek(&KeyPrefix1(2)).unwrap(); + assert_eq!(collect_values(iter), [200, 114, 112, 110, 104, 102, 100]); +} + +#[test] +fn test_seek_for_prev_by_1prefix() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_for_prev(&KeyPrefix1(2)).unwrap(); + assert_eq!(collect_values(iter), [114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek_for_prev(&KeyPrefix1(2)).unwrap(); + assert_eq!(collect_values(iter), [114, 112, 110, 104, 102, 100]); +} + +#[test] +fn test_seek_by_2prefix() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek(&KeyPrefix2(2, 0)).unwrap(); + assert_eq!(collect_values(iter), [200, 202]); + + let mut iter = db.rev_iter(); + iter.seek(&KeyPrefix2(2, 0)).unwrap(); + assert_eq!(collect_values(iter), [200, 114, 112, 110, 104, 102, 100]); +} + +#[test] +fn test_seek_for_prev_by_2prefix() { + let db = TestDB::new(); + + let mut iter = db.iter(); + iter.seek_for_prev(&KeyPrefix2(2, 0)).unwrap(); + assert_eq!(collect_values(iter), [114, 200, 202]); + + let mut iter = db.rev_iter(); + iter.seek_for_prev(&KeyPrefix2(2, 0)).unwrap(); + assert_eq!(collect_values(iter), [114, 112, 110, 104, 102, 100]); +} + +#[test] +fn test_schema_batch_iteration_order() { + let mut batch = SchemaBatch::new(); + + // Operations in expected order + let operations = vec![ + (TestCompositeField(2, 0, 0), TestField(600)), + (TestCompositeField(1, 3, 4), TestField(500)), + (TestCompositeField(1, 3, 3), TestField(400)), + (TestCompositeField(1, 3, 2), TestField(300)), + (TestCompositeField(1, 3, 0), TestField(200)), + (TestCompositeField(1, 2, 0), TestField(100)), + ]; + + // Insert them out of order + for i in [4, 2, 0, 1, 3, 5] { + let (key, value) = &operations[i]; + batch.put::(key, value).unwrap(); + } + + let iter = batch.iter::(); + let collected: Vec<_> = iter + .filter_map(|(key, value)| match value { + Operation::Put { value } => Some(( + decode_key(key), + >::decode_value(value).unwrap(), + )), + Operation::Delete => None, + }) + .collect(); + assert_eq!(operations, collected); +} + +#[test] +fn test_schema_batch_iteration_with_deletions() { + let mut batch = SchemaBatch::new(); + + batch + .put::(&TestCompositeField(8, 0, 0), &TestField(6)) + .unwrap(); + batch.delete::(&TestCompositeField(9, 0, 0)).unwrap(); + batch + .put::(&TestCompositeField(12, 0, 0), &TestField(1)) + .unwrap(); + batch + .put::(&TestCompositeField(1, 0, 0), &TestField(2)) + .unwrap(); + let mut iter = batch.iter::().peekable(); + let first1 = iter.peek().unwrap(); + assert_eq!(first1.0, &encode_key(&TestCompositeField(12, 0, 0))); + assert_eq!( + first1.1, + &Operation::Put { + value: encode_value(&TestField(1)), + } + ); + let collected: Vec<_> = iter.collect(); + assert_eq!(4, collected.len()); +} + +#[test] +fn test_schema_batch_iter_range() { + let mut batch = SchemaBatch::new(); + + batch + .put::(&TestCompositeField(8, 0, 0), &TestField(5)) + .unwrap(); + batch.delete::(&TestCompositeField(9, 0, 0)).unwrap(); + batch + .put::(&TestCompositeField(8, 1, 0), &TestField(3)) + .unwrap(); + + batch + .put::(&TestCompositeField(11, 0, 0), &TestField(6)) + .unwrap(); + batch + .put::(&TestCompositeField(13, 0, 0), &TestField(2)) + .unwrap(); + batch + .put::(&TestCompositeField(12, 0, 0), &TestField(1)) + .unwrap(); + + let seek_key = + >::encode_seek_key(&TestCompositeField(11, 0, 0)) + .unwrap(); + + let mut iter = batch.iter_range::(seek_key); + + assert_eq!( + Some(( + &encode_key(&TestCompositeField(11, 0, 0)), + &Operation::Put { + value: encode_value(&TestField(6)) + } + )), + iter.next() + ); + assert_eq!( + Some(( + &encode_key(&TestCompositeField(9, 0, 0)), + &Operation::Delete + )), + iter.next() + ); + assert_eq!( + Some(( + &encode_key(&TestCompositeField(8, 1, 0)), + &Operation::Put { + value: encode_value(&TestField(3)) + } + )), + iter.next() + ); + assert_eq!( + Some(( + &encode_key(&TestCompositeField(8, 0, 0)), + &Operation::Put { + value: encode_value(&TestField(5)) + } + )), + iter.next() + ); + assert_eq!(None, iter.next()); +} + +#[test] +fn test_db_snapshot_get_last_value() { + let manager = Arc::new(RwLock::new(SingleSnapshotQueryManager::default())); + + let snapshot_1 = DbSnapshot::new(0, ReadOnlyLock::new(manager.clone())); + + assert!(snapshot_1.get_largest::().unwrap().is_none()); + + let key_1 = TestCompositeField(8, 2, 3); + let value_1 = TestField(6); + + snapshot_1.put::(&key_1, &value_1).unwrap(); + + { + let (latest_key, latest_value) = snapshot_1 + .get_largest::() + .unwrap() + .expect("largest key-value pair should be found"); + assert_eq!(key_1, latest_key); + assert_eq!(value_1, latest_value); + } + + { + let mut manager = manager.write().unwrap(); + manager.add_snapshot(snapshot_1.into()); + } + + let snapshot_2 = DbSnapshot::new(1, ReadOnlyLock::new(manager.clone())); + + { + let (latest_key, latest_value) = snapshot_2 + .get_largest::() + .unwrap() + .expect("largest key-value pair should be found"); + assert_eq!(key_1, latest_key); + assert_eq!(value_1, latest_value); + } + + let key_2 = TestCompositeField(8, 1, 3); + let value_2 = TestField(7); + snapshot_2.put::(&key_2, &value_2).unwrap(); + { + let (latest_key, latest_value) = snapshot_2 + .get_largest::() + .unwrap() + .expect("largest key-value pair should be found"); + assert_eq!(key_1, latest_key); + assert_eq!(value_1, latest_value); + } + + // Largest value from local is picked up + let key_3 = TestCompositeField(8, 3, 1); + let value_3 = TestField(8); + snapshot_2.put::(&key_3, &value_3).unwrap(); + { + let (latest_key, latest_value) = snapshot_2 + .get_largest::() + .unwrap() + .expect("largest key-value pair should be found"); + assert_eq!(key_3, latest_key); + assert_eq!(value_3, latest_value); + } + + // Deletion: Previous "largest" value is returned + snapshot_2.delete::(&key_3).unwrap(); + { + let (latest_key, latest_value) = snapshot_2 + .get_largest::() + .unwrap() + .expect("large key-value pair should be found"); + assert_eq!(key_1, latest_key); + assert_eq!(value_1, latest_value); + } +} + +#[test] +fn test_db_snapshot_get_prev_value() { + let manager = Arc::new(RwLock::new(SingleSnapshotQueryManager::default())); + + // Snapshots 1 and 2 are to black box usages of parents iterator + let snapshot_1 = DbSnapshot::new(0, ReadOnlyLock::new(manager.clone())); + + let key_1 = TestCompositeField(8, 2, 3); + let key_2 = TestCompositeField(8, 2, 0); + let key_3 = TestCompositeField(8, 3, 2); + + assert!(snapshot_1.get_prev::(&key_1).unwrap().is_none()); + + snapshot_1.put::(&key_2, &TestField(10)).unwrap(); + snapshot_1.put::(&key_1, &TestField(1)).unwrap(); + snapshot_1 + .put::(&TestCompositeField(8, 1, 3), &TestField(11)) + .unwrap(); + snapshot_1 + .put::(&TestCompositeField(7, 2, 3), &TestField(12)) + .unwrap(); + snapshot_1 + .put::(&TestCompositeField(8, 2, 5), &TestField(13)) + .unwrap(); + snapshot_1.put::(&key_3, &TestField(14)).unwrap(); + + // Equal: + assert_eq!( + (key_1.clone(), TestField(1)), + snapshot_1.get_prev::(&key_1).unwrap().unwrap() + ); + // Previous: value from 8.2.0 + assert_eq!( + (key_2.clone(), TestField(10)), + snapshot_1 + .get_prev::(&TestCompositeField(8, 2, 1)) + .unwrap() + .unwrap() + ); + + { + let mut manager = manager.write().unwrap(); + manager.add_snapshot(snapshot_1.into()); + } + + let snapshot_2 = DbSnapshot::new(1, ReadOnlyLock::new(manager.clone())); + // Equal: + assert_eq!( + (key_1.clone(), TestField(1)), + snapshot_2.get_prev::(&key_1).unwrap().unwrap() + ); + // Previous: value from 8.2.0 + assert_eq!( + (key_2.clone(), TestField(10)), + snapshot_2 + .get_prev::(&TestCompositeField(8, 2, 1)) + .unwrap() + .unwrap() + ); + snapshot_2.put::(&key_2, &TestField(20)).unwrap(); + snapshot_2.put::(&key_1, &TestField(2)).unwrap(); + // Updated values are higher priority + assert_eq!( + (key_1.clone(), TestField(2)), + snapshot_2.get_prev::(&key_1).unwrap().unwrap() + ); + assert_eq!( + (key_2.clone(), TestField(20)), + snapshot_2 + .get_prev::(&TestCompositeField(8, 2, 1)) + .unwrap() + .unwrap() + ); + snapshot_2.delete::(&key_1).unwrap(); + assert_eq!( + (key_2.clone(), TestField(20)), + snapshot_2.get_prev::(&key_1).unwrap().unwrap() + ); + { + let mut manager = manager.write().unwrap(); + manager.add_snapshot(snapshot_2.into()); + } + let snapshot_3 = DbSnapshot::new(2, ReadOnlyLock::new(manager.clone())); + assert_eq!( + (key_2.clone(), TestField(20)), + snapshot_3 + .get_prev::(&TestCompositeField(8, 2, 1)) + .unwrap() + .unwrap() + ); + assert_eq!( + (key_2.clone(), TestField(20)), + snapshot_3.get_prev::(&key_1).unwrap().unwrap() + ); + assert_eq!( + (key_3, TestField(14)), + snapshot_3 + .get_prev::(&TestCompositeField(8, 3, 4)) + .unwrap() + .unwrap() + ); +} diff --git a/tests/snapshot_test.rs b/tests/snapshot_test.rs new file mode 100644 index 0000000..00376a2 --- /dev/null +++ b/tests/snapshot_test.rs @@ -0,0 +1,49 @@ +use std::sync::{Arc, RwLock}; + +use rockbound::define_schema; +use rockbound::snapshot::{DbSnapshot, ReadOnlyLock, SingleSnapshotQueryManager}; +use rockbound::test::TestField; + +define_schema!(TestSchema1, TestField, TestField, "TestCF1"); + +type S = TestSchema1; + +#[test] +fn snapshot_lifecycle() { + let manager = Arc::new(RwLock::new(SingleSnapshotQueryManager::default())); + + let key = TestField(1); + let value = TestField(1); + + let snapshot_1 = DbSnapshot::new(0, ReadOnlyLock::new(manager.clone())); + assert_eq!( + None, + snapshot_1.read::(&key).unwrap(), + "Incorrect value, should find nothing" + ); + + snapshot_1.put::(&key, &value).unwrap(); + assert_eq!( + Some(value), + snapshot_1.read::(&key).unwrap(), + "Incorrect value, should be fetched from local cache" + ); + { + let mut manager = manager.write().unwrap(); + manager.add_snapshot(snapshot_1.into()); + } + + // Snapshot 2: reads value from snapshot 1, then deletes it + let snapshot_2 = DbSnapshot::new(1, ReadOnlyLock::new(manager.clone())); + assert_eq!(Some(value), snapshot_2.read::(&key).unwrap()); + snapshot_2.delete::(&key).unwrap(); + assert_eq!(None, snapshot_2.read::(&key).unwrap()); + { + let mut manager = manager.write().unwrap(); + manager.add_snapshot(snapshot_2.into()); + } + + // Snapshot 3: gets empty result, event value is in some previous snapshots + let snapshot_3 = DbSnapshot::new(2, ReadOnlyLock::new(manager.clone())); + assert_eq!(None, snapshot_3.read::(&key).unwrap()); +}