Skip to content

Commit

Permalink
Simplified version of CacheDb
Browse files Browse the repository at this point in the history
  • Loading branch information
citizen-stig committed May 14, 2024
1 parent 4b76fd6 commit eb8e3de
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 137 deletions.
162 changes: 26 additions & 136 deletions src/cache/delta_db.rs → src/cache/delta_reader.rs
Original file line number Diff line number Diff line change
@@ -1,97 +1,50 @@
#![allow(dead_code)]
//! This module contains the next iteration of [`crate::cache::cache_db::CacheDb`]
use crate::cache::change_set::{ChangeSet, ChangeSetIter};
use crate::cache::SnapshotId;
use crate::cache::change_set::ChangeSetIter;
use crate::iterator::{RawDbIter, ScanDirection};
use crate::schema::{KeyCodec, ValueCodec};
use crate::schema::KeyCodec;
use crate::{
Operation, PaginatedResponse, Schema, SchemaBatch, SchemaKey, SchemaValue, SeekKeyEncoder, DB,
};
use std::cmp::Ordering;
use std::iter::{Peekable, Rev};
use std::sync::{Arc, Mutex};
use std::sync::Arc;

/// Intermediate step between [`crate::cache::cache_db::CacheDb`] and future DeltaDbReader
/// Supports "local writes". And for historical reading it uses `Vec<Arc<ChangeSet>`
#[derive(Debug)]
pub struct DeltaDb {
/// Local writes are collected here.
local_cache: Mutex<ChangeSet>,
pub struct DeltaReader {
/// Set of not finalized changes in **reverse** order.
snapshots: Vec<Arc<ChangeSet>>,
snapshots: Vec<Arc<SchemaBatch>>,
/// Reading finalized data from here.
db: DB,
}

impl DeltaDb {
/// Creates new [`DeltaDb`] with given `id`, `db` and `uncommited_changes`.
impl DeltaReader {
/// Creates new [`DeltaReader`] with given `id`, `db` and `uncommited_changes`.
/// `uncommited_changes` should be in reverse order.
pub fn new(id: SnapshotId, db: DB, uncommited_changes: Vec<Arc<ChangeSet>>) -> Self {
pub fn new(db: DB, uncommited_changes: Vec<Arc<SchemaBatch>>) -> Self {
Self {
local_cache: Mutex::new(ChangeSet::new(id)),
snapshots: uncommited_changes,
db,
}
}

/// Set a new value to a given key.
pub fn put<S: Schema>(
&self,
key: &impl KeyCodec<S>,
value: &impl ValueCodec<S>,
) -> anyhow::Result<()> {
self.local_cache
.lock()
.expect("Local ChangeSet lock must not be poisoned")
.operations
.put(key, value)
}

/// Delete given key. Deletion of the key is going to be propagated to the DB eventually.
pub fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
self.local_cache
.lock()
.expect("Local ChangeSet lock must not be poisoned")
.operations
.delete(key)
}

/// Writes many operations at once in local cache, atomically.
pub fn write_many(&self, batch: SchemaBatch) -> anyhow::Result<()> {
let mut inner = self
.local_cache
.lock()
.expect("Local SchemaBatch lock must not be poisoned");
inner.operations.merge(batch);
Ok(())
}

/// Get a value, wherever it is.
pub fn read<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<Option<S::Value>> {
pub fn get<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<Option<S::Value>> {
// Some(Operation) means that key was touched,
// but in case of deletion, we early return None
// Only in case of not finding operation for key,
// we go deeper

// Hold local cache lock explicitly, so reads are atomic
let local_cache = self
.local_cache
.lock()
.expect("SchemaBatch lock should not be poisoned");

// 1. Check in cache
if let Some(operation) = local_cache.get(key)? {
return operation.decode_value::<S>();
}

// 2. Check in snapshots, in reverse order
// 1. Check in snapshots, in reverse order
for snapshot in self.snapshots.iter() {
if let Some(operation) = snapshot.get::<S>(key)? {
if let Some(operation) = snapshot.get_operation::<S>(key)? {
return operation.decode_value::<S>();
}
}

// 3. Check in DB
// 2. Check in DB
self.db.get(key)
}

Expand Down Expand Up @@ -126,17 +79,7 @@ impl DeltaDb {
}

//
fn iter_rev<S: Schema>(
&self,
) -> anyhow::Result<DeltaDbIter<Rev<ChangeSetIter>, Rev<ChangeSetIter>>>
{
// Local iter
let change_set = self
.local_cache
.lock()
.expect("Local cache lock must not be poisoned");
let local_cache_iter = change_set.iter::<S>().rev();

fn iter_rev<S: Schema>(&self) -> anyhow::Result<DeltaReaderIter<Rev<ChangeSetIter>>> {
// Snapshot iterators
let snapshot_iterators = self
.snapshots
Expand All @@ -147,50 +90,33 @@ impl DeltaDb {
// Db Iter
let db_iter = self.db.raw_iter::<S>(ScanDirection::Backward)?;

Ok(DeltaDbIter::new(
local_cache_iter,
Ok(DeltaReaderIter::new(
snapshot_iterators,
db_iter,
ScanDirection::Backward,
))
}

// ---------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------
/// Get a clone of internal ChangeSet
pub fn clone_change_set(&self) -> ChangeSet {
let change_set = self
.local_cache
.lock()
.expect("Local change set lock is poisoned");
change_set.clone()
}
}

struct DeltaDbIter<'a, LocalCacheIter, SnapshotIter>
struct DeltaReaderIter<'a, SnapshotIter>
where
LocalCacheIter: Iterator<Item = (&'a SchemaKey, &'a Operation)>,
SnapshotIter: Iterator<Item = (&'a SchemaKey, &'a Operation)>,
{
local_cache_iter: Peekable<LocalCacheIter>,
snapshot_iterators: Vec<Peekable<SnapshotIter>>,
db_iter: Peekable<RawDbIter<'a>>,
direction: ScanDirection,
}

impl<'a, LocalCacheIter, SnapshotIter> DeltaDbIter<'a, LocalCacheIter, SnapshotIter>
impl<'a, SnapshotIter> DeltaReaderIter<'a, SnapshotIter>
where
LocalCacheIter: Iterator<Item = (&'a SchemaKey, &'a Operation)>,
SnapshotIter: Iterator<Item = (&'a SchemaKey, &'a Operation)>,
{
fn new(
local_cache_iter: LocalCacheIter,
snapshot_iterators: Vec<SnapshotIter>,
db_iter: RawDbIter<'a>,
direction: ScanDirection,
) -> Self {
Self {
local_cache_iter: local_cache_iter.peekable(),
snapshot_iterators: snapshot_iterators
.into_iter()
.map(|iter| iter.peekable())
Expand All @@ -206,12 +132,10 @@ enum DataLocation {
Db,
// Index inside `snapshot_iterators`
Snapshot(usize),
LocalCache,
}

impl<'a, LocalCacheIter, SnapshotIter> Iterator for DeltaDbIter<'a, LocalCacheIter, SnapshotIter>
impl<'a, SnapshotIter> Iterator for DeltaReaderIter<'a, SnapshotIter>
where
LocalCacheIter: Iterator<Item = (&'a SchemaKey, &'a Operation)>,
SnapshotIter: Iterator<Item = (&'a SchemaKey, &'a Operation)>,
{
// I wish
Expand Down Expand Up @@ -273,29 +197,6 @@ where
}
}

// Pick the next value from local cache
if let Some(&(peeked_key, _)) = self.local_cache_iter.peek() {
match next_value {
None => {
next_value_locations.push(DataLocation::LocalCache);
}
Some(next_key) => {
match (&self.direction, peeked_key.cmp(next_key)) {
(_, Ordering::Equal) => {
next_value_locations.push(DataLocation::LocalCache);
}
(ScanDirection::Backward, Ordering::Greater)
| (ScanDirection::Forward, Ordering::Less) => {
next_value_locations.clear();
next_value_locations.push(DataLocation::LocalCache);
}
// Key is not important for given an iterator direction
_ => {}
}
}
}
}

// All next values are observed at this point
// Handling actual change of the iterator state.
if let Some(latest_next_location) = next_value_locations.pop() {
Expand All @@ -308,9 +209,6 @@ where
DataLocation::Snapshot(idx) => {
let _ = self.snapshot_iterators[*idx].next().unwrap();
}
DataLocation::LocalCache => {
let _ = self.local_cache_iter.next().unwrap();
}
}
}

Expand All @@ -329,15 +227,6 @@ where
Operation::Delete => continue,
}
}
DataLocation::LocalCache => {
let (key, operation) = self.local_cache_iter.next().unwrap();
match operation {
Operation::Put { value } => {
return Some((key.to_vec(), value.to_vec()))
}
Operation::Delete => continue,
}
}
}
} else {
break;
Expand All @@ -352,6 +241,7 @@ mod tests {
use std::path::Path;

use rocksdb::DEFAULT_COLUMN_FAMILY_NAME;
use rockbound::schema::ValueCodec;

use super::*;
use crate::schema::KeyEncoder;
Expand All @@ -377,21 +267,21 @@ mod tests {
<TestField as ValueCodec<TestSchema>>::encode_value(value).unwrap()
}

fn put_value(delta_db: &DeltaDb, key: u32, value: u32) {
fn put_value(delta_db: &DeltaReader, key: u32, value: u32) {
delta_db
.put::<TestSchema>(&TestCompositeField(key, 0, 0), &TestField(value))
.unwrap();
}

fn check_value(delta_db: &DeltaDb, key: u32, expected_value: Option<u32>) {
fn check_value(delta_db: &DeltaReader, key: u32, expected_value: Option<u32>) {
let actual_value = delta_db
.read::<TestSchema>(&TestCompositeField(key, 0, 0))
.get::<TestSchema>(&TestCompositeField(key, 0, 0))
.unwrap()
.map(|v| v.0);
assert_eq!(expected_value, actual_value);
}

fn delete_key(delta_db: &DeltaDb, key: u32) {
fn delete_key(delta_db: &DeltaReader, key: u32) {
delta_db
.delete::<TestSchema>(&TestCompositeField(key, 0, 0))
.unwrap();
Expand All @@ -408,7 +298,7 @@ mod tests {
let tmpdir = tempfile::tempdir().unwrap();
let db = open_db(tmpdir.path());

let delta_db = DeltaDb::new(0, db, vec![]);
let delta_db = DeltaReader::new(db, vec![]);

// Not existing
check_value(&delta_db, 1, None);
Expand All @@ -423,15 +313,15 @@ mod tests {
}

mod iteration {
use crate::cache::delta_db::tests::open_db;
use crate::cache::delta_db::DeltaDb;
use crate::cache::delta_reader::tests::open_db;
use crate::cache::delta_reader::DeltaReader;

#[test]
fn test_empty_iterator() {
let tmpdir = tempfile::tempdir().unwrap();
let db = open_db(tmpdir.path());

let delta_db = DeltaDb::new(0, db, vec![]);
let delta_db = DeltaReader::new(db, vec![]);

let iterator = delta_db.iter_rev()?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
pub mod cache_container;
pub mod cache_db;
pub mod change_set;
mod delta_db;
mod delta_reader;

/// Id of ChangeSet/snapshot/cache layer
pub type SnapshotId = u64;

0 comments on commit eb8e3de

Please sign in to comment.