Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing old cache types #16

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
995 changes: 0 additions & 995 deletions src/cache/cache_container.rs

This file was deleted.

609 changes: 0 additions & 609 deletions src/cache/cache_db.rs

This file was deleted.

61 changes: 0 additions & 61 deletions src/cache/change_set.rs

This file was deleted.

6 changes: 0 additions & 6 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@

use crate::Schema;

pub mod cache_container;
pub mod cache_db;
pub mod change_set;
pub mod delta_reader;

/// Id of ChangeSet/snapshot/cache layer
pub type SnapshotId = u64;

/// Response for a paginated query which also includes the "next" key to pass.
#[derive(Debug)]
pub struct PaginatedResponse<S: Schema> {
Expand Down
6 changes: 2 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use rocksdb::Options;

/// Port selected RocksDB options for tuning underlying rocksdb instance of our state db.
/// The current default values are taken from Aptos. TODO: tune rocksdb for our workload.
/// see <https://github.com/facebook/rocksdb/blob/master/include/rocksdb/options.h>
Expand Down Expand Up @@ -31,8 +29,8 @@ impl Default for RocksdbConfig {
}

/// Generate [`rocksdb::Options`] corresponding to the given [`RocksdbConfig`].
pub fn gen_rocksdb_options(config: &RocksdbConfig, readonly: bool) -> Options {
let mut db_opts = Options::default();
pub fn gen_rocksdb_options(config: &RocksdbConfig, readonly: bool) -> rocksdb::Options {
let mut db_opts = rocksdb::Options::default();
db_opts.set_max_open_files(config.max_open_files);
db_opts.set_max_total_wal_size(config.max_total_wal_size);
db_opts.set_max_background_jobs(config.max_background_jobs);
Expand Down
32 changes: 2 additions & 30 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub mod test;
pub use config::{gen_rocksdb_options, RocksdbConfig};

use std::path::Path;
use std::sync::{Arc, LockResult, RwLock, RwLockReadGuard};

use anyhow::format_err;
use iterator::ScanDirection;
Expand Down Expand Up @@ -103,7 +102,7 @@ impl DB {
Ok(Self::log_construct(name, inner))
}

/// Open db in secondary mode. A secondary db is does not support writes, but can be dynamically caught up
/// Open db in secondary mode. A secondary db does not support writes, but can be dynamically caught up
/// to the primary instance by a manual call. See <https://github.com/facebook/rocksdb/wiki/Read-only-and-Secondary-instances>
/// for more details.
pub fn open_cf_as_secondary<P: AsRef<Path>>(
Expand Down Expand Up @@ -413,7 +412,7 @@ pub enum CodecError {
Io(#[from] std::io::Error),
}

/// For now we always use synchronous writes. This makes sure that once the operation returns
/// For now, we always use synchronous writes. This makes sure that once the operation returns
/// `Ok(())` the data is persisted even if the machine crashes. In the future we might consider
/// selectively turning this off for some non-critical writes to improve performance.
fn default_write_options() -> rocksdb::WriteOptions {
Expand All @@ -422,33 +421,6 @@ fn default_write_options() -> rocksdb::WriteOptions {
opts
}

/// Wrapper around `RwLock` that only allows read access.
/// This type implies that wrapped type suppose to be used only for reading.
/// It is useful to indicate that user of this type can only do reading.
/// This also implies that that inner `Arc<RwLock<T>>` is a clone and some other part can do writing.
#[derive(Debug, Clone)]
pub struct ReadOnlyLock<T> {
lock: Arc<RwLock<T>>,
}

impl<T> ReadOnlyLock<T> {
/// Create new [`ReadOnlyLock`] from [`Arc<RwLock<T>>`].
pub fn new(lock: Arc<RwLock<T>>) -> Self {
Self { lock }
}

/// Acquires a read lock on the underlying [`RwLock`].
pub fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> {
self.lock.read()
}
}

impl<T> From<Arc<RwLock<T>>> for ReadOnlyLock<T> {
fn from(value: Arc<RwLock<T>>) -> Self {
Self::new(value)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::CodecError;
pub type ColumnFamilyName = &'static str;

/// A [`Schema`] is a type-safe interface over a specific column family in a
/// [`DB`](crate::DB). It always a key type ([`KeyCodec`]) and a value type ([`ValueCodec`]).
/// [`DB`](crate::DB). It is always a key type ([`KeyCodec`]) and a value type ([`ValueCodec`]).
pub trait Schema: Debug + Send + Sync + 'static + Sized {
/// The column family name associated with this struct.
/// Note: all schemas within the same SchemaDB must have distinct column family names.
Expand Down
204 changes: 1 addition & 203 deletions tests/iterator_test.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use rockbound::cache::cache_container::CacheContainer;
use rockbound::cache::cache_db::CacheDb;
use rockbound::schema::{KeyDecoder, KeyEncoder, ValueCodec};
use rockbound::test::{KeyPrefix1, KeyPrefix2, TestCompositeField, TestField};
use rockbound::{
define_schema, Operation, ReadOnlyLock, Schema, SchemaBatch, SchemaIterator, SeekKeyEncoder, DB,
define_schema, Operation, Schema, SchemaBatch, SchemaIterator, SeekKeyEncoder, DB,
};
use rocksdb::DEFAULT_COLUMN_FAMILY_NAME;
use tempfile::TempDir;
Expand Down Expand Up @@ -365,200 +360,3 @@ fn test_schema_batch_iter_range() {
);
assert_eq!(None, iter.next());
}

#[test]
fn test_db_snapshot_get_last_value() {
let db = TestDB::new();
let to_parent = Arc::new(RwLock::new(HashMap::new()));
{
let mut to_parent = to_parent.write().unwrap();
to_parent.insert(1, 0);
}
let manager = Arc::new(RwLock::new(CacheContainer::new(db.db, to_parent.into())));

let snapshot_1 = CacheDb::new(0, ReadOnlyLock::new(manager.clone()));

{
let (largest_key_in_db, largest_value_in_db) =
snapshot_1.get_largest::<S>().unwrap().unwrap();
assert_eq!(TestCompositeField(2, 0, 2), largest_key_in_db);
assert_eq!(TestField(202), largest_value_in_db);
}

let key_1 = TestCompositeField(8, 2, 3);
let value_1 = TestField(6);

snapshot_1.put::<S>(&key_1, &value_1).unwrap();

{
let (largest_key, largest_value) = snapshot_1
.get_largest::<S>()
.unwrap()
.expect("largest key-value pair should be found");
assert_eq!(key_1, largest_key);
assert_eq!(value_1, largest_value);
}

{
let mut manager = manager.write().unwrap();
manager.add_snapshot(snapshot_1.into()).unwrap();
}

let snapshot_2 = CacheDb::new(1, ReadOnlyLock::new(manager));

{
let (latest_key, latest_value) = snapshot_2
.get_largest::<S>()
.unwrap()
.expect("largest key-value pair should be found");
assert_eq!(key_1, latest_key);
assert_eq!(value_1, latest_value);
}

let key_2 = TestCompositeField(8, 1, 3);
let value_2 = TestField(7);
snapshot_2.put::<S>(&key_2, &value_2).unwrap();
{
let (latest_key, latest_value) = snapshot_2
.get_largest::<S>()
.unwrap()
.expect("largest key-value pair should be found");
assert_eq!(key_1, latest_key);
assert_eq!(value_1, latest_value);
}

// Largest value from local is picked up
let key_3 = TestCompositeField(8, 3, 1);
let value_3 = TestField(8);
snapshot_2.put::<S>(&key_3, &value_3).unwrap();
{
let (latest_key, latest_value) = snapshot_2
.get_largest::<S>()
.unwrap()
.expect("largest key-value pair should be found");
assert_eq!(key_3, latest_key);
assert_eq!(value_3, latest_value);
}

// Deletion: Previous "largest" value is returned
snapshot_2.delete::<S>(&key_3).unwrap();
{
let (latest_key, latest_value) = snapshot_2
.get_largest::<S>()
.unwrap()
.expect("large key-value pair should be found");
assert_eq!(key_1, latest_key);
assert_eq!(value_1, latest_value);
}
}

#[test]
fn test_db_cache_container_get_prev_value() {
let tmpdir = tempfile::tempdir().unwrap();
let db = open_inner_db(tmpdir.path());
let to_parent = Arc::new(RwLock::new(HashMap::new()));
{
let mut to_parent = to_parent.write().unwrap();
to_parent.insert(1, 0);
to_parent.insert(2, 1);
}
let cache_container = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into())));

// Snapshots 1 and 2 are to black box usages of parents iterator
let snapshot_1 = CacheDb::new(0, ReadOnlyLock::new(cache_container.clone()));

let key_1 = TestCompositeField(8, 2, 3);
let key_2 = TestCompositeField(8, 2, 0);
let key_3 = TestCompositeField(8, 3, 2);

assert!(snapshot_1.get_prev::<S>(&key_1).unwrap().is_none());

snapshot_1.put::<S>(&key_2, &TestField(10)).unwrap();
snapshot_1.put::<S>(&key_1, &TestField(1)).unwrap();
snapshot_1
.put::<S>(&TestCompositeField(8, 1, 3), &TestField(11))
.unwrap();
snapshot_1
.put::<S>(&TestCompositeField(7, 2, 3), &TestField(12))
.unwrap();
snapshot_1
.put::<S>(&TestCompositeField(8, 2, 5), &TestField(13))
.unwrap();
snapshot_1.put::<S>(&key_3, &TestField(14)).unwrap();

// Equal:
assert_eq!(
(key_1.clone(), TestField(1)),
snapshot_1.get_prev::<S>(&key_1).unwrap().unwrap()
);
// Previous: value from 8.2.0
assert_eq!(
(key_2.clone(), TestField(10)),
snapshot_1
.get_prev::<S>(&TestCompositeField(8, 2, 1))
.unwrap()
.unwrap()
);

{
let mut manager = cache_container.write().unwrap();
manager.add_snapshot(snapshot_1.into()).unwrap();
}

let snapshot_2 = CacheDb::new(1, ReadOnlyLock::new(cache_container.clone()));
// Equal:
assert_eq!(
(key_1.clone(), TestField(1)),
snapshot_2.get_prev::<S>(&key_1).unwrap().unwrap()
);
// Previous: value from 8.2.0
assert_eq!(
(key_2.clone(), TestField(10)),
snapshot_2
.get_prev::<S>(&TestCompositeField(8, 2, 1))
.unwrap()
.unwrap()
);
snapshot_2.put::<S>(&key_2, &TestField(20)).unwrap();
snapshot_2.put::<S>(&key_1, &TestField(2)).unwrap();
// Updated values are higher priority
assert_eq!(
(key_1.clone(), TestField(2)),
snapshot_2.get_prev::<S>(&key_1).unwrap().unwrap()
);
assert_eq!(
(key_2.clone(), TestField(20)),
snapshot_2
.get_prev::<S>(&TestCompositeField(8, 2, 1))
.unwrap()
.unwrap()
);
snapshot_2.delete::<S>(&key_1).unwrap();
assert_eq!(
(key_2.clone(), TestField(20)),
snapshot_2.get_prev::<S>(&key_1).unwrap().unwrap()
);
{
let mut manager = cache_container.write().unwrap();
manager.add_snapshot(snapshot_2.into()).unwrap();
}
let snapshot_3 = CacheDb::new(2, ReadOnlyLock::new(cache_container));
assert_eq!(
(key_2.clone(), TestField(20)),
snapshot_3
.get_prev::<S>(&TestCompositeField(8, 2, 1))
.unwrap()
.unwrap()
);
assert_eq!(
(key_2, TestField(20)),
snapshot_3.get_prev::<S>(&key_1).unwrap().unwrap()
);
assert_eq!(
(key_3, TestField(14)),
snapshot_3
.get_prev::<S>(&TestCompositeField(8, 3, 4))
.unwrap()
.unwrap()
);
}
Loading
Loading