From 9dadfd17ae48a33d98b377aa638a75a86a0b8e17 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 2 Sep 2023 21:04:33 +0200 Subject: [PATCH] Split out storage engine module. --- src/sql/engine/kv.rs | 30 +- src/sql/engine/raft.rs | 14 +- src/storage/{kv => engine}/bitcask.rs | 12 +- .../golden/bitcask/compact-after | 0 .../golden/bitcask/compact-before | 0 src/storage/{kv => engine}/golden/bitcask/log | 0 src/storage/{kv => engine}/memory.rs | 8 +- src/storage/engine/mod.rs | 294 +++++++++++++++++ src/storage/kv/mod.rs | 296 +----------------- src/storage/kv/mvcc.rs | 92 +++--- src/storage/mod.rs | 1 + 11 files changed, 376 insertions(+), 371 deletions(-) rename src/storage/{kv => engine}/bitcask.rs (98%) rename src/storage/{kv => engine}/golden/bitcask/compact-after (100%) rename src/storage/{kv => engine}/golden/bitcask/compact-before (100%) rename src/storage/{kv => engine}/golden/bitcask/log (100%) rename src/storage/{kv => engine}/memory.rs (90%) create mode 100644 src/storage/engine/mod.rs diff --git a/src/sql/engine/kv.rs b/src/sql/engine/kv.rs index 488a335e7..330ecf36c 100644 --- a/src/sql/engine/kv.rs +++ b/src/sql/engine/kv.rs @@ -9,22 +9,22 @@ use std::borrow::Cow; use std::clone::Clone; use std::collections::HashSet; -/// A SQL engine based on an underlying MVCC key/value store -pub struct KV { - /// The underlying key/value store - pub(super) kv: kv::MVCC, +/// A SQL engine based on an underlying MVCC key/value store. +pub struct KV { + /// The underlying key/value store. + pub(super) kv: kv::MVCC, } // FIXME Implement Clone manually due to https://github.com/rust-lang/rust/issues/26925 -impl Clone for KV { +impl Clone for KV { fn clone(&self) -> Self { KV::new(self.kv.clone()) } } -impl KV { +impl KV { /// Creates a new key/value-based SQL engine - pub fn new(kv: kv::MVCC) -> Self { + pub fn new(kv: kv::MVCC) -> Self { Self { kv } } @@ -39,8 +39,8 @@ impl KV { } } -impl super::Engine for KV { - type Transaction = Transaction; +impl super::Engine for KV { + type Transaction = Transaction; fn begin(&self, mode: super::Mode) -> Result { Ok(Self::Transaction::new(self.kv.begin_with_mode(mode)?)) @@ -62,13 +62,13 @@ fn deserialize<'a, V: Deserialize<'a>>(bytes: &'a [u8]) -> Result { } /// An SQL transaction based on an MVCC key/value transaction -pub struct Transaction { - txn: kv::mvcc::Transaction, +pub struct Transaction { + txn: kv::mvcc::Transaction, } -impl Transaction { +impl Transaction { /// Creates a new SQL transaction from an MVCC transaction - fn new(txn: kv::mvcc::Transaction) -> Self { + fn new(txn: kv::mvcc::Transaction) -> Self { Self { txn } } @@ -99,7 +99,7 @@ impl Transaction { } } -impl super::Transaction for Transaction { +impl super::Transaction for Transaction { fn id(&self) -> u64 { self.txn.id() } @@ -270,7 +270,7 @@ impl super::Transaction for Transaction { } } -impl Catalog for Transaction { +impl Catalog for Transaction { fn create_table(&mut self, table: Table) -> Result<()> { if self.read_table(&table.name)?.is_some() { return Err(Error::Value(format!("Table {} already exists", table.name))); diff --git a/src/sql/engine/raft.rs b/src/sql/engine/raft.rs index 0981ebe41..0b27460d1 100644 --- a/src/sql/engine/raft.rs +++ b/src/sql/engine/raft.rs @@ -74,7 +74,7 @@ impl Raft { } /// Creates an underlying state machine for a Raft engine. - pub fn new_state(kv: kv::MVCC) -> Result> { + pub fn new_state(kv: kv::MVCC) -> Result> { State::new(kv) } @@ -260,17 +260,17 @@ impl Catalog for Transaction { } /// The Raft state machine for the Raft-based SQL engine, using a KV SQL engine -pub struct State { +pub struct State { /// The underlying KV SQL engine - engine: super::KV, + engine: super::KV, /// The last applied index applied_index: u64, } -impl State { +impl State { /// Creates a new Raft state maching using the given MVCC key/value store - pub fn new(store: kv::MVCC) -> Result { - let engine = super::KV::new(store); + pub fn new(engine: kv::MVCC) -> Result { + let engine = super::KV::new(engine); let applied_index = engine .get_metadata(b"applied_index")? .map(|b| Raft::deserialize(&b)) @@ -305,7 +305,7 @@ impl State { } } -impl raft::State for State { +impl raft::State for State { fn applied_index(&self) -> u64 { self.applied_index } diff --git a/src/storage/kv/bitcask.rs b/src/storage/engine/bitcask.rs similarity index 98% rename from src/storage/kv/bitcask.rs rename to src/storage/engine/bitcask.rs index 9ac7cb36a..116b17f24 100644 --- a/src/storage/kv/bitcask.rs +++ b/src/storage/engine/bitcask.rs @@ -1,4 +1,4 @@ -use super::Store; +use super::Engine; use crate::error::Result; use fs4::FileExt; @@ -6,7 +6,7 @@ use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}; use std::path::PathBuf; /// A very simple variant of BitCask, itself a very simple log-structured -/// key-value store used e.g. by the Riak database. It is not compatible with +/// key-value engine used e.g. by the Riak database. It is not compatible with /// BitCask databases generated by other implementations. See: /// https://riak.com/assets/bitcask-intro.pdf /// @@ -93,7 +93,7 @@ impl std::fmt::Display for BitCask { } } -impl Store for BitCask { +impl Engine for BitCask { type ScanIterator<'a> = ScanIterator<'a>; fn delete(&mut self, key: &[u8]) -> Result<()> { @@ -380,14 +380,14 @@ impl Log { mod tests { use super::*; - const GOLDEN_DIR: &str = "src/storage/kv/golden/bitcask"; + const GOLDEN_DIR: &str = "src/storage/engine/golden/bitcask"; - super::super::tests::test_store!({ + super::super::tests::test_engine!({ let path = tempdir::TempDir::new("toydb")?.path().join("toydb"); BitCask::new(path)? }); - /// Creates a new BitCask store for testing. + /// Creates a new BitCask engine for testing. fn setup() -> Result { BitCask::new(tempdir::TempDir::new("toydb")?.path().join("toydb")) } diff --git a/src/storage/kv/golden/bitcask/compact-after b/src/storage/engine/golden/bitcask/compact-after similarity index 100% rename from src/storage/kv/golden/bitcask/compact-after rename to src/storage/engine/golden/bitcask/compact-after diff --git a/src/storage/kv/golden/bitcask/compact-before b/src/storage/engine/golden/bitcask/compact-before similarity index 100% rename from src/storage/kv/golden/bitcask/compact-before rename to src/storage/engine/golden/bitcask/compact-before diff --git a/src/storage/kv/golden/bitcask/log b/src/storage/engine/golden/bitcask/log similarity index 100% rename from src/storage/kv/golden/bitcask/log rename to src/storage/engine/golden/bitcask/log diff --git a/src/storage/kv/memory.rs b/src/storage/engine/memory.rs similarity index 90% rename from src/storage/kv/memory.rs rename to src/storage/engine/memory.rs index 5d2325f0e..e3e4335c8 100644 --- a/src/storage/kv/memory.rs +++ b/src/storage/engine/memory.rs @@ -1,7 +1,7 @@ -use super::Store; +use super::Engine; use crate::error::Result; -/// An in-memory key/value store using the Rust standard library B-tree +/// An in-memory key/value storage engine using the Rust standard library B-tree /// implementation. Data is not persisted. pub struct Memory { data: std::collections::BTreeMap, Vec>, @@ -20,7 +20,7 @@ impl std::fmt::Display for Memory { } } -impl Store for Memory { +impl Engine for Memory { type ScanIterator<'a> = ScanIterator<'a>; fn flush(&mut self) -> Result<()> { @@ -75,5 +75,5 @@ impl<'a> DoubleEndedIterator for ScanIterator<'a> { mod tests { use super::*; - super::super::tests::test_store!(Memory::new()); + super::super::tests::test_engine!(Memory::new()); } diff --git a/src/storage/engine/mod.rs b/src/storage/engine/mod.rs new file mode 100644 index 000000000..f07206321 --- /dev/null +++ b/src/storage/engine/mod.rs @@ -0,0 +1,294 @@ +mod bitcask; +mod memory; + +pub use bitcask::BitCask; +pub use memory::Memory; + +use crate::error::Result; + +/// A key/value storage engine, where both keys and values are arbitrary byte +/// strings between 0 B and 2 GB, stored in lexicographical key order. Writes +/// are only guaranteed durable after calling flush(). +/// +/// Only supports single-threaded use since all methods (including reads) take a +/// mutable reference -- serialized access can't be avoided anyway, since both +/// Raft execution and file access is serial. +pub trait Engine: std::fmt::Display + Send + Sync { + /// The iterator returned by scan(). Traits can't return "impl Trait", and + /// we don't want to use trait objects, so the type must be specified. + type ScanIterator<'a>: DoubleEndedIterator, Vec)>> + 'a + where + Self: 'a; + + /// Deletes a key, or does nothing if it does not exist. + fn delete(&mut self, key: &[u8]) -> Result<()>; + + /// Flushes any buffered data to the underlying storage medium. + fn flush(&mut self) -> Result<()>; + + /// Gets a value for a key, if it exists. + fn get(&mut self, key: &[u8]) -> Result>>; + + /// Iterates over an ordered range of key/value pairs. + fn scan>>(&mut self, range: R) -> Self::ScanIterator<'_>; + + /// Sets a value for a key, replacing the existing value if any. + fn set(&mut self, key: &[u8], value: Vec) -> Result<()>; +} + +#[cfg(test)] +mod tests { + /// Generates common tests for any Engine implementation. + macro_rules! test_engine { + ($setup:expr) => { + /// Tests Engine point operations, i.e. set, get, and delete. + #[test] + fn point_ops() -> Result<()> { + let mut s = $setup; + + // Getting a missing key should return None. + assert_eq!(s.get(b"a")?, None); + + // Setting and getting a key should return its value. + s.set(b"a", vec![1])?; + assert_eq!(s.get(b"a")?, Some(vec![1])); + + // Setting a different key should not affect the first. + s.set(b"b", vec![2])?; + assert_eq!(s.get(b"b")?, Some(vec![2])); + assert_eq!(s.get(b"a")?, Some(vec![1])); + + // Getting a different missing key should return None. The + // comparison is case-insensitive for strings. + assert_eq!(s.get(b"c")?, None); + assert_eq!(s.get(b"A")?, None); + + // Setting an existing key should replace its value. + s.set(b"a", vec![0])?; + assert_eq!(s.get(b"a")?, Some(vec![0])); + + // Deleting a key should remove it, but not affect others. + s.delete(b"a")?; + assert_eq!(s.get(b"a")?, None); + assert_eq!(s.get(b"b")?, Some(vec![2])); + + // Deletes are idempotent. + s.delete(b"a")?; + assert_eq!(s.get(b"a")?, None); + + Ok(()) + } + + #[test] + /// Tests Engine point operations on empty keys and values. These + /// are as valid as any other key/value. + fn point_ops_empty() -> Result<()> { + let mut s = $setup; + assert_eq!(s.get(b"")?, None); + s.set(b"", vec![])?; + assert_eq!(s.get(b"")?, Some(vec![])); + s.delete(b"")?; + assert_eq!(s.get(b"")?, None); + Ok(()) + } + + #[test] + /// Tests Engine point operations on keys and values of increasing + /// sizes, up to 16 MB. + fn point_ops_sizes() -> Result<()> { + let mut s = $setup; + + // Generate keys/values for increasing powers of two. + for size in (1..=24).map(|i| 1 << i) { + let bytes = "x".repeat(size); + let key = bytes.as_bytes(); + let value = bytes.clone().into_bytes(); + + assert_eq!(s.get(key)?, None); + s.set(key, value.clone())?; + assert_eq!(s.get(key)?, Some(value)); + s.delete(key)?; + assert_eq!(s.get(key)?, None); + } + + Ok(()) + } + + #[test] + /// Tests various Engine scans. + fn scan() -> Result<()> { + let mut s = $setup; + s.set(b"a", vec![1])?; + s.set(b"b", vec![2])?; + s.set(b"ba", vec![2, 1])?; + s.set(b"bb", vec![2, 2])?; + s.set(b"c", vec![3])?; + s.set(b"C", vec![3])?; + + #[track_caller] + fn assert_scan(iter: I, expect: Vec<(&[u8], Vec)>) -> Result<()> + where + I: Iterator, Vec)>>, + { + assert_eq!( + iter.collect::>>()?, + expect.into_iter().map(|(k, v)| (k.to_vec(), v)).collect::>() + ); + Ok(()) + } + + // Forward/reverse scans. + assert_scan( + s.scan(b"b".to_vec()..b"bz".to_vec()), + vec![(b"b", vec![2]), (b"ba", vec![2, 1]), (b"bb", vec![2, 2])], + )?; + assert_scan( + s.scan(b"b".to_vec()..b"bz".to_vec()).rev(), + vec![(b"bb", vec![2, 2]), (b"ba", vec![2, 1]), (b"b", vec![2])], + )?; + + // Inclusive/exclusive ranges. + assert_scan( + s.scan(b"b".to_vec()..b"bb".to_vec()), + vec![(b"b", vec![2]), (b"ba", vec![2, 1])], + )?; + assert_scan( + s.scan(b"b".to_vec()..=b"bb".to_vec()), + vec![(b"b", vec![2]), (b"ba", vec![2, 1]), (b"bb", vec![2, 2])], + )?; + + // Open ranges. + assert_scan(s.scan(b"bb".to_vec()..), vec![(b"bb", vec![2, 2]), (b"c", vec![3])])?; + assert_scan( + s.scan(..=b"b".to_vec()), + vec![(b"C", vec![3]), (b"a", vec![1]), (b"b", vec![2])], + )?; + + // Full range. + assert_scan( + s.scan(..), + vec![ + (b"C", vec![3]), + (b"a", vec![1]), + (b"b", vec![2]), + (b"ba", vec![2, 1]), + (b"bb", vec![2, 2]), + (b"c", vec![3]), + ], + )?; + Ok(()) + } + + #[test] + /// Runs random operations both on a Engine and a known-good + /// BTreeMap, comparing the results of each operation as well as the + /// final state. + fn random_ops() -> Result<()> { + const NUM_OPS: u64 = 1000; + + use rand::{seq::SliceRandom, Rng, RngCore}; + let seed: u64 = rand::thread_rng().gen(); + let mut rng: rand::rngs::StdRng = rand::SeedableRng::seed_from_u64(seed); + println!("seed = {}", seed); + + #[derive(Debug)] + enum Op { + Set, + Delete, + Get, + Scan, + } + + impl rand::distributions::Distribution for rand::distributions::Standard { + fn sample(&self, rng: &mut R) -> Op { + match rng.gen_range(0..=3) { + 0 => Op::Set, + 1 => Op::Delete, + 2 => Op::Get, + 3 => Op::Scan, + _ => panic!("unexpected value"), + } + } + } + + let mut s = $setup; + let mut keys: Vec> = Vec::new(); + let mut m = std::collections::BTreeMap::new(); + + // Pick an already-used key with 80% probability, or generate a + // new key. + let mut random_key = |mut rng: &mut rand::rngs::StdRng| -> Vec { + if rng.gen::() < 0.8 && !keys.is_empty() { + keys.choose(&mut rng).unwrap().clone() + } else { + let mut key = vec![0; rng.gen_range(0..=16)]; + rng.fill_bytes(&mut key); + keys.push(key.clone()); + key + } + }; + + let random_value = |rng: &mut rand::rngs::StdRng| -> Vec { + let mut value = vec![0; rng.gen_range(0..=16)]; + rng.fill_bytes(&mut value); + value + }; + + // Run random operations. + for _ in 0..NUM_OPS { + match rng.gen::() { + Op::Set => { + let key = random_key(&mut rng); + let value = random_value(&mut rng); + println!("set {:?} = {:?}", key, value); + s.set(&key, value.clone())?; + m.insert(key, value); + } + Op::Delete => { + let key = random_key(&mut rng); + println!("delete {:?}", key); + s.delete(&key)?; + m.remove(&key); + } + Op::Get => { + let key = random_key(&mut rng); + let value = s.get(&key)?; + let expect = m.get(&key).cloned(); + println!("get {:?} => {:?}", key, value); + assert_eq!(value, expect); + } + Op::Scan => { + let mut from = random_key(&mut rng); + let mut to = random_key(&mut rng); + if (to < from) { + (from, to) = (to, from) + } + println!("scan {:?} .. {:?}", from, to); + let result = + s.scan(from.clone()..to.clone()).collect::>>()?; + let expect = m + .range(from..to) + .map(|(k, v)| (k.clone(), v.clone())) + .collect::>(); + assert_eq!(result, expect); + } + } + } + + // Compare the final states. + println!("comparing final state"); + + let state = s.scan(..).collect::>>()?; + let expect = m + .range::, _>(..) + .map(|(k, v)| (k.clone(), v.clone())) + .collect::>(); + assert_eq!(state, expect); + + Ok(()) + } + }; + } + + pub(super) use test_engine; // export for use in submodules +} diff --git a/src/storage/kv/mod.rs b/src/storage/kv/mod.rs index 5e7536305..ca92b871f 100644 --- a/src/storage/kv/mod.rs +++ b/src/storage/kv/mod.rs @@ -1,297 +1,7 @@ -mod bitcask; pub mod encoding; -mod memory; pub mod mvcc; -pub use bitcask::BitCask; -pub use memory::Memory; +pub use super::engine::BitCask; +pub use super::engine::Engine; +pub use super::engine::Memory; pub use mvcc::MVCC; - -use crate::error::Result; - -/// A key/value storage engine, where both keys and values are arbitrary byte -/// strings between 0 B and 2 GB, stored in lexicographical key order. Writes -/// are only guaranteed durable after calling flush(). -/// -/// Only supports single-threaded use since all methods (including reads) take a -/// mutable reference -- serialized access can't be avoided anyway, since both -/// Raft execution and file access is serial. -pub trait Store: std::fmt::Display + Send + Sync { - /// The iterator returned by scan(). Traits can't return "impl Trait", and - /// we don't want to use trait objects, so the type must be specified. - type ScanIterator<'a>: DoubleEndedIterator, Vec)>> + 'a - where - Self: 'a; - - /// Deletes a key, or does nothing if it does not exist. - fn delete(&mut self, key: &[u8]) -> Result<()>; - - /// Flushes any buffered data to the underlying storage medium. - fn flush(&mut self) -> Result<()>; - - /// Gets a value for a key, if it exists. - fn get(&mut self, key: &[u8]) -> Result>>; - - /// Iterates over an ordered range of key/value pairs. - fn scan>>(&mut self, range: R) -> Self::ScanIterator<'_>; - - /// Sets a value for a key, replacing the existing value if any. - fn set(&mut self, key: &[u8], value: Vec) -> Result<()>; -} - -#[cfg(test)] -mod tests { - /// Generates common tests for any Store implementation. - macro_rules! test_store { - ($setup:expr) => { - /// Tests Store point operations, i.e. set, get, and delete. - #[test] - fn point_ops() -> Result<()> { - let mut s = $setup; - - // Getting a missing key should return None. - assert_eq!(s.get(b"a")?, None); - - // Setting and getting a key should return its value. - s.set(b"a", vec![1])?; - assert_eq!(s.get(b"a")?, Some(vec![1])); - - // Setting a different key should not affect the first. - s.set(b"b", vec![2])?; - assert_eq!(s.get(b"b")?, Some(vec![2])); - assert_eq!(s.get(b"a")?, Some(vec![1])); - - // Getting a different missing key should return None. The - // comparison is case-insensitive for strings. - assert_eq!(s.get(b"c")?, None); - assert_eq!(s.get(b"A")?, None); - - // Setting an existing key should replace its value. - s.set(b"a", vec![0])?; - assert_eq!(s.get(b"a")?, Some(vec![0])); - - // Deleting a key should remove it, but not affect others. - s.delete(b"a")?; - assert_eq!(s.get(b"a")?, None); - assert_eq!(s.get(b"b")?, Some(vec![2])); - - // Deletes are idempotent. - s.delete(b"a")?; - assert_eq!(s.get(b"a")?, None); - - Ok(()) - } - - #[test] - /// Tests Store point operations on empty keys and values. These - /// are as valid as any other key/value. - fn point_ops_empty() -> Result<()> { - let mut s = $setup; - assert_eq!(s.get(b"")?, None); - s.set(b"", vec![])?; - assert_eq!(s.get(b"")?, Some(vec![])); - s.delete(b"")?; - assert_eq!(s.get(b"")?, None); - Ok(()) - } - - #[test] - /// Tests Store point operations on keys and values of increasing - /// sizes, up to 16 MB. - fn point_ops_sizes() -> Result<()> { - let mut s = $setup; - - // Generate keys/values for increasing powers of two. - for size in (1..=24).map(|i| 1 << i) { - let bytes = "x".repeat(size); - let key = bytes.as_bytes(); - let value = bytes.clone().into_bytes(); - - assert_eq!(s.get(key)?, None); - s.set(key, value.clone())?; - assert_eq!(s.get(key)?, Some(value)); - s.delete(key)?; - assert_eq!(s.get(key)?, None); - } - - Ok(()) - } - - #[test] - /// Tests various Store scans. - fn scan() -> Result<()> { - let mut s = $setup; - s.set(b"a", vec![1])?; - s.set(b"b", vec![2])?; - s.set(b"ba", vec![2, 1])?; - s.set(b"bb", vec![2, 2])?; - s.set(b"c", vec![3])?; - s.set(b"C", vec![3])?; - - #[track_caller] - fn assert_scan(iter: I, expect: Vec<(&[u8], Vec)>) -> Result<()> - where - I: Iterator, Vec)>>, - { - assert_eq!( - iter.collect::>>()?, - expect.into_iter().map(|(k, v)| (k.to_vec(), v)).collect::>() - ); - Ok(()) - } - - // Forward/reverse scans. - assert_scan( - s.scan(b"b".to_vec()..b"bz".to_vec()), - vec![(b"b", vec![2]), (b"ba", vec![2, 1]), (b"bb", vec![2, 2])], - )?; - assert_scan( - s.scan(b"b".to_vec()..b"bz".to_vec()).rev(), - vec![(b"bb", vec![2, 2]), (b"ba", vec![2, 1]), (b"b", vec![2])], - )?; - - // Inclusive/exclusive ranges. - assert_scan( - s.scan(b"b".to_vec()..b"bb".to_vec()), - vec![(b"b", vec![2]), (b"ba", vec![2, 1])], - )?; - assert_scan( - s.scan(b"b".to_vec()..=b"bb".to_vec()), - vec![(b"b", vec![2]), (b"ba", vec![2, 1]), (b"bb", vec![2, 2])], - )?; - - // Open ranges. - assert_scan(s.scan(b"bb".to_vec()..), vec![(b"bb", vec![2, 2]), (b"c", vec![3])])?; - assert_scan( - s.scan(..=b"b".to_vec()), - vec![(b"C", vec![3]), (b"a", vec![1]), (b"b", vec![2])], - )?; - - // Full range. - assert_scan( - s.scan(..), - vec![ - (b"C", vec![3]), - (b"a", vec![1]), - (b"b", vec![2]), - (b"ba", vec![2, 1]), - (b"bb", vec![2, 2]), - (b"c", vec![3]), - ], - )?; - Ok(()) - } - - #[test] - /// Runs random operations both on a Store and a known-good - /// BTreeMap, comparing the results of each operation as well as the - /// final state. - fn random_ops() -> Result<()> { - const NUM_OPS: u64 = 1000; - - use rand::{seq::SliceRandom, Rng, RngCore}; - let seed: u64 = rand::thread_rng().gen(); - let mut rng: rand::rngs::StdRng = rand::SeedableRng::seed_from_u64(seed); - println!("seed = {}", seed); - - #[derive(Debug)] - enum Op { - Set, - Delete, - Get, - Scan, - } - - impl rand::distributions::Distribution for rand::distributions::Standard { - fn sample(&self, rng: &mut R) -> Op { - match rng.gen_range(0..=3) { - 0 => Op::Set, - 1 => Op::Delete, - 2 => Op::Get, - 3 => Op::Scan, - _ => panic!("unexpected value"), - } - } - } - - let mut s = $setup; - let mut keys: Vec> = Vec::new(); - let mut m = std::collections::BTreeMap::new(); - - // Pick an already-used key with 80% probability, or generate a - // new key. - let mut random_key = |mut rng: &mut rand::rngs::StdRng| -> Vec { - if rng.gen::() < 0.8 && !keys.is_empty() { - keys.choose(&mut rng).unwrap().clone() - } else { - let mut key = vec![0; rng.gen_range(0..=16)]; - rng.fill_bytes(&mut key); - keys.push(key.clone()); - key - } - }; - - let random_value = |rng: &mut rand::rngs::StdRng| -> Vec { - let mut value = vec![0; rng.gen_range(0..=16)]; - rng.fill_bytes(&mut value); - value - }; - - // Run random operations. - for _ in 0..NUM_OPS { - match rng.gen::() { - Op::Set => { - let key = random_key(&mut rng); - let value = random_value(&mut rng); - println!("set {:?} = {:?}", key, value); - s.set(&key, value.clone())?; - m.insert(key, value); - } - Op::Delete => { - let key = random_key(&mut rng); - println!("delete {:?}", key); - s.delete(&key)?; - m.remove(&key); - } - Op::Get => { - let key = random_key(&mut rng); - let value = s.get(&key)?; - let expect = m.get(&key).cloned(); - println!("get {:?} => {:?}", key, value); - assert_eq!(value, expect); - } - Op::Scan => { - let mut from = random_key(&mut rng); - let mut to = random_key(&mut rng); - if (to < from) { - (from, to) = (to, from) - } - println!("scan {:?} .. {:?}", from, to); - let result = - s.scan(from.clone()..to.clone()).collect::>>()?; - let expect = m - .range(from..to) - .map(|(k, v)| (k.clone(), v.clone())) - .collect::>(); - assert_eq!(result, expect); - } - } - } - - // Compare the final states. - println!("comparing final state"); - - let state = s.scan(..).collect::>>()?; - let expect = m - .range::, _>(..) - .map(|(k, v)| (k.clone(), v.clone())) - .collect::>(); - assert_eq!(state, expect); - - Ok(()) - } - }; - } - - pub(super) use test_store; // export for use in submodules -} diff --git a/src/storage/kv/mvcc.rs b/src/storage/kv/mvcc.rs index 3dbb729ba..fa7338f02 100644 --- a/src/storage/kv/mvcc.rs +++ b/src/storage/kv/mvcc.rs @@ -1,4 +1,4 @@ -use super::{encoding, Store}; +use super::{encoding, Engine}; use crate::error::{Error, Result}; use serde::{Deserialize, Serialize}; @@ -16,49 +16,49 @@ pub struct Status { pub storage: String, } -/// An MVCC-based transactional key-value store. -pub struct MVCC { - /// The underlying KV store. It is protected by a mutex so it can be shared between txns. - store: Arc>, +/// An MVCC-based transactional key-value engine. +pub struct MVCC { + /// The underlying KV engine. It is protected by a mutex so it can be shared between txns. + engine: Arc>, } -impl Clone for MVCC { +impl Clone for MVCC { fn clone(&self) -> Self { - MVCC { store: self.store.clone() } + MVCC { engine: self.engine.clone() } } } -impl MVCC { - /// Creates a new MVCC key-value store with the given key-value store for storage. - pub fn new(store: S) -> Self { - Self { store: Arc::new(Mutex::new(store)) } +impl MVCC { + /// Creates a new MVCC engine with the given storage engine. + pub fn new(engine: E) -> Self { + Self { engine: Arc::new(Mutex::new(engine)) } } /// Begins a new transaction in read-write mode. #[allow(dead_code)] - pub fn begin(&self) -> Result> { - Transaction::begin(self.store.clone(), Mode::ReadWrite) + pub fn begin(&self) -> Result> { + Transaction::begin(self.engine.clone(), Mode::ReadWrite) } /// Begins a new transaction in the given mode. - pub fn begin_with_mode(&self, mode: Mode) -> Result> { - Transaction::begin(self.store.clone(), mode) + pub fn begin_with_mode(&self, mode: Mode) -> Result> { + Transaction::begin(self.engine.clone(), mode) } /// Resumes a transaction with the given ID. - pub fn resume(&self, id: u64) -> Result> { - Transaction::resume(self.store.clone(), id) + pub fn resume(&self, id: u64) -> Result> { + Transaction::resume(self.engine.clone(), id) } /// Fetches an unversioned metadata value pub fn get_metadata(&self, key: &[u8]) -> Result>> { - let mut session = self.store.lock()?; + let mut session = self.engine.lock()?; session.get(&Key::Metadata(key.into()).encode()) } /// Sets an unversioned metadata value pub fn set_metadata(&self, key: &[u8], value: Vec) -> Result<()> { - let mut session = self.store.lock()?; + let mut session = self.engine.lock()?; session.set(&Key::Metadata(key.into()).encode(), value) } @@ -68,14 +68,14 @@ impl MVCC { // https://github.com/rust-lang/reference/issues/452 #[allow(clippy::needless_return)] pub fn status(&self) -> Result { - let mut store = self.store.lock()?; + let mut engine = self.engine.lock()?; return Ok(Status { - storage: store.to_string(), - txns: match store.get(&Key::TxnNext.encode())? { + storage: engine.to_string(), + txns: match engine.get(&Key::TxnNext.encode())? { Some(ref v) => deserialize(v)?, None => 1, } - 1, - txns_active: store + txns_active: engine .scan(Key::TxnActive(0).encode()..Key::TxnActive(std::u64::MAX).encode()) .try_fold(0, |count, r| r.map(|_| count + 1))?, }); @@ -93,9 +93,9 @@ fn deserialize<'a, V: Deserialize<'a>>(bytes: &'a [u8]) -> Result { } /// An MVCC transaction. -pub struct Transaction { - /// The underlying store for the transaction. Shared between transactions using a mutex. - store: Arc>, +pub struct Transaction { + /// The underlying engine for the transaction. Shared between transactions using a mutex. + engine: Arc>, /// The unique transaction ID. id: u64, /// The transaction mode. @@ -104,10 +104,10 @@ pub struct Transaction { snapshot: Snapshot, } -impl Transaction { +impl Transaction { /// Begins a new transaction in the given mode. - fn begin(store: Arc>, mode: Mode) -> Result { - let mut session = store.lock()?; + fn begin(engine: Arc>, mode: Mode) -> Result { + let mut session = engine.lock()?; let id = match session.get(&Key::TxnNext.encode())? { Some(ref v) => deserialize(v)?, @@ -121,26 +121,26 @@ impl Transaction { // for any future snapshot transactions looking at this one. let mut snapshot = Snapshot::take(&mut session, id)?; if let Mode::Snapshot { version } = &mode { - snapshot = Snapshot::restore(&mut session, *version)? + snapshot = Snapshot::reengine(&mut session, *version)? } drop(session); - Ok(Self { store, id, mode, snapshot }) + Ok(Self { engine, id, mode, snapshot }) } /// Resumes an active transaction with the given ID. Errors if the transaction is not active. - fn resume(store: Arc>, id: u64) -> Result { - let mut session = store.lock()?; + fn resume(engine: Arc>, id: u64) -> Result { + let mut session = engine.lock()?; let mode = match session.get(&Key::TxnActive(id).encode())? { Some(v) => deserialize(&v)?, None => return Err(Error::Value(format!("No active transaction {}", id))), }; let snapshot = match &mode { - Mode::Snapshot { version } => Snapshot::restore(&mut session, *version)?, - _ => Snapshot::restore(&mut session, id)?, + Mode::Snapshot { version } => Snapshot::reengine(&mut session, *version)?, + _ => Snapshot::reengine(&mut session, id)?, }; std::mem::drop(session); - Ok(Self { store, id, mode, snapshot }) + Ok(Self { engine, id, mode, snapshot }) } /// Returns the transaction ID. @@ -155,14 +155,14 @@ impl Transaction { /// Commits the transaction, by removing the txn from the active set. pub fn commit(self) -> Result<()> { - let mut session = self.store.lock()?; + let mut session = self.engine.lock()?; session.delete(&Key::TxnActive(self.id).encode())?; session.flush() } /// Rolls back the transaction, by removing all updated entries. pub fn rollback(self) -> Result<()> { - let mut session = self.store.lock()?; + let mut session = self.engine.lock()?; if self.mode.mutable() { let mut rollback = Vec::new(); let mut scan = session.scan( @@ -191,7 +191,7 @@ impl Transaction { /// Fetches a key. pub fn get(&self, key: &[u8]) -> Result>> { - let mut session = self.store.lock()?; + let mut session = self.engine.lock()?; let mut scan = session .scan(Key::Record(key.into(), 0).encode()..=Key::Record(key.into(), self.id).encode()) .rev(); @@ -220,8 +220,8 @@ impl Transaction { Bound::Included(k) => Bound::Included(Key::Record(k.into(), std::u64::MAX).encode()), Bound::Unbounded => Bound::Unbounded, }; - // TODO: For now, collect results from the store to not have to deal with lifetimes. - let scan = Box::new(self.store.lock()?.scan((start, end)).collect::>().into_iter()); + // TODO: For now, collect results from the engine to not have to deal with lifetimes. + let scan = Box::new(self.engine.lock()?.scan((start, end)).collect::>().into_iter()); Ok(Box::new(Scan::new(scan, self.snapshot.clone()))) } @@ -259,7 +259,7 @@ impl Transaction { if !self.mode.mutable() { return Err(Error::ReadOnly); } - let mut session = self.store.lock()?; + let mut session = self.engine.lock()?; // Check if the key is dirty, i.e. if it has any uncommitted changes, by scanning for any // versions that aren't visible to us. @@ -338,7 +338,7 @@ struct Snapshot { impl Snapshot { /// Takes a new snapshot, persisting it as `Key::TxnSnapshot(version)`. - fn take(session: &mut MutexGuard, version: u64) -> Result { + fn take(session: &mut MutexGuard, version: u64) -> Result { let mut snapshot = Self { version, invisible: HashSet::new() }; let mut scan = session.scan(Key::TxnActive(0).encode()..Key::TxnActive(version).encode()); while let Some((key, _)) = scan.next().transpose()? { @@ -352,8 +352,8 @@ impl Snapshot { Ok(snapshot) } - /// Restores an existing snapshot from `Key::TxnSnapshot(version)`, or errors if not found. - fn restore(session: &mut MutexGuard, version: u64) -> Result { + /// Reengines an existing snapshot from `Key::TxnSnapshot(version)`, or errors if not found. + fn reengine(session: &mut MutexGuard, version: u64) -> Result { match session.get(&Key::TxnSnapshot(version).encode())? { Some(ref v) => Ok(Self { version, invisible: deserialize(v)? }), None => Err(Error::Value(format!("Snapshot not found for version {}", version))), @@ -427,7 +427,7 @@ pub type ScanIterator<'a> = /// A key range scan. pub struct Scan<'a> { - /// The augmented KV store iterator, with key (decoded) and value. Note that we don't retain + /// The augmented KV engine iterator, with key (decoded) and value. Note that we don't retain /// the decoded version, so there will be multiple keys (for each version). We want the last. scan: Peekable>, /// Keeps track of next_back() seen key, whose previous versions should be ignored. diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1f345d73d..bea47e2f6 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,2 +1,3 @@ +pub mod engine; pub mod kv; pub mod log;