Skip to content

Commit

Permalink
Add async operations to DBs (#8)
Browse files Browse the repository at this point in the history
* add tokio dependency

* add macros tokio feature for tests

* add async methods to cache db

* add async methods to base rocksdb wrapper
  • Loading branch information
ross-weir authored May 21, 2024
1 parent 11c7f74 commit 677fa2f
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ proptest-derive = { version = "0.4", optional = true }
rocksdb = { version = "0.21" }
thiserror = "1"
tracing = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

[dev-dependencies]
rockbound = { path = ".", features = ["test-utils"] }
Expand Down
38 changes: 38 additions & 0 deletions src/cache/cache_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ impl CacheDb {
parent.get::<S>(local_cache.id(), key)
}

/// Get a value from current snapshot, its parents or underlying database
pub async fn read_async<S: Schema>(
&self,
key: &impl KeyCodec<S>,
) -> anyhow::Result<Option<S::Value>> {
tokio::task::block_in_place(|| self.read(key))
}

/// Get value of largest key written value for given [`Schema`]
pub fn get_largest<S: Schema>(&self) -> anyhow::Result<Option<(S::Key, S::Value)>> {
let change_set = self
Expand Down Expand Up @@ -127,6 +135,11 @@ impl CacheDb {
Ok(None)
}

/// Get value of largest key written value for given [`Schema`]
pub async fn get_largest_async<S: Schema>(&self) -> anyhow::Result<Option<(S::Key, S::Value)>> {
tokio::task::block_in_place(|| self.get_largest::<S>())
}

/// Get largest value in [`Schema`] that is smaller than give `seek_key`
pub fn get_prev<S: Schema>(
&self,
Expand Down Expand Up @@ -160,6 +173,14 @@ impl CacheDb {
Ok(None)
}

/// Get largest value in [`Schema`] that is smaller than give `seek_key`
pub async fn get_prev_async<S: Schema>(
&self,
seek_key: &impl SeekKeyEncoder<S>,
) -> anyhow::Result<Option<(S::Key, S::Value)>> {
tokio::task::block_in_place(|| self.get_prev(seek_key))
}

/// Get `n` keys >= `seek_key`
pub fn get_n_from_first_match<S: Schema>(
&self,
Expand Down Expand Up @@ -206,6 +227,15 @@ impl CacheDb {
})
}

/// Get `n` keys >= `seek_key`
pub async fn get_n_from_first_match_async<S: Schema>(
&self,
seek_key: &impl SeekKeyEncoder<S>,
n: usize,
) -> anyhow::Result<PaginatedResponse<S>> {
tokio::task::block_in_place(|| self.get_n_from_first_match(seek_key, n))
}

/// Get a clone of internal ChangeSet
pub fn clone_change_set(&self) -> ChangeSet {
let change_set = self
Expand Down Expand Up @@ -251,6 +281,14 @@ impl CacheDb {

Ok(result)
}

/// Collects all key-value pairs in given range, from smallest to largest.
pub async fn collect_in_range_async<S: Schema, Sk: SeekKeyEncoder<S>>(
&self,
range: std::ops::Range<Sk>,
) -> anyhow::Result<Vec<(S::Key, S::Value)>> {
tokio::task::block_in_place(|| self.collect_in_range(range))
}
}

/// Iterator over [`CacheDb`] that combines local cache and parent iterators.
Expand Down
45 changes: 45 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ impl DB {
.map_err(|err| err.into())
}

/// Reads a single record by key asynchronously.
pub async fn get_async<S: Schema>(
&self,
schema_key: &impl KeyCodec<S>,
) -> anyhow::Result<Option<S::Value>> {
tokio::task::block_in_place(|| self.get(schema_key))
}

/// Writes single record.
pub fn put<S: Schema>(
&self,
Expand All @@ -155,6 +163,15 @@ impl DB {
self.write_schemas(&batch)
}

/// Writes a single record asynchronously.
pub async fn put_async<S: Schema>(
&self,
key: &impl KeyCodec<S>,
value: &impl ValueCodec<S>,
) -> anyhow::Result<()> {
tokio::task::block_in_place(|| self.put(key, value))
}

/// Delete a single key from the database.
pub fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
// Not necessary to use a batch, but we'd like a central place to bump counters.
Expand All @@ -164,6 +181,11 @@ impl DB {
self.write_schemas(&batch)
}

/// Delete a single key from the database asynchronously.
pub async fn delete_async<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
tokio::task::block_in_place(|| self.delete(key))
}

/// Removes the database entries in the range `["from", "to")` using default write options.
///
/// Note that this operation will be done lexicographic on the *encoding* of the seek keys. It is
Expand All @@ -181,6 +203,19 @@ impl DB {
Ok(())
}

/// Removes the database entries in the range `["from", "to")` using default write options asynchronously.
///
/// Note that this operation will be done lexicographic on the *encoding* of the seek keys. It is
/// up to the table creator to ensure that the lexicographic ordering of the encoded seek keys matches the
/// logical ordering of the type.
pub async fn delete_range_async<S: Schema>(
&self,
from: &impl SeekKeyEncoder<S>,
to: &impl SeekKeyEncoder<S>,
) -> anyhow::Result<()> {
tokio::task::block_in_place(|| self.delete_range(from, to))
}

fn iter_with_direction<S: Schema>(
&self,
opts: ReadOptions,
Expand Down Expand Up @@ -269,6 +304,11 @@ impl DB {
Ok(())
}

/// Writes a group of records wrapped in a [`SchemaBatch`] asynchronously.
pub async fn write_schemas_async(&self, batch: &SchemaBatch) -> anyhow::Result<()> {
tokio::task::block_in_place(|| self.write_schemas(batch))
}

fn get_cf_handle(&self, cf_name: &str) -> anyhow::Result<&rocksdb::ColumnFamily> {
self.inner.cf_handle(cf_name).ok_or_else(|| {
format_err!(
Expand Down Expand Up @@ -303,6 +343,11 @@ impl DB {
rocksdb::checkpoint::Checkpoint::new(&self.inner)?.create_checkpoint(path)?;
Ok(())
}

/// Creates new physical DB checkpoint in directory specified by `path` asynchronously.
pub async fn create_checkpoint_async<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
tokio::task::block_in_place(|| self.create_checkpoint(path))
}
}

/// Readability alias for a key in the DB.
Expand Down
63 changes: 63 additions & 0 deletions tests/db_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,66 @@ fn test_checkpoint() {
assert_eq!(db.get::<TestSchema1>(&TestField(1)).unwrap(), None);
}
}

mod async_tests {
use super::*;

#[tokio::test(flavor = "multi_thread")]
async fn test_async_ops() {
let tmpdir = tempfile::tempdir().unwrap();
let db = open_db(tmpdir);

db.put_async::<TestSchema1>(&TestField(1), &TestField(55))
.await
.unwrap();
assert_eq!(
db.get_async::<TestSchema1>(&TestField(1)).await.unwrap(),
Some(TestField(55)),
);

db.delete_async::<TestSchema1>(&TestField(1)).await.unwrap();
assert!(db
.get_async::<TestSchema1>(&TestField(1))
.await
.unwrap()
.is_none());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_checkpoint() {
let tmpdir = tempfile::tempdir().unwrap();
let checkpoint_parent = tempfile::tempdir().unwrap();
let checkpoint = checkpoint_parent.path().join("checkpoint");
{
let db = open_db(&tmpdir);
db.put_async::<TestSchema1>(&TestField(0), &TestField(0))
.await
.unwrap();
db.create_checkpoint_async(&checkpoint).await.unwrap();
}
{
let db = open_db(&tmpdir);
assert_eq!(
db.get_async::<TestSchema1>(&TestField(0)).await.unwrap(),
Some(TestField(0)),
);

let cp = open_db(&checkpoint);
assert_eq!(
cp.get_async::<TestSchema1>(&TestField(0)).await.unwrap(),
Some(TestField(0)),
);
cp.put_async::<TestSchema1>(&TestField(1), &TestField(1))
.await
.unwrap();
assert_eq!(
cp.get_async::<TestSchema1>(&TestField(1)).await.unwrap(),
Some(TestField(1)),
);
assert_eq!(
db.get_async::<TestSchema1>(&TestField(1)).await.unwrap(),
None
);
}
}
}

0 comments on commit 677fa2f

Please sign in to comment.