Skip to content

Commit

Permalink
add async methods to base rocksdb wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
ross-weir committed May 21, 2024
1 parent 6f0c620 commit b990385
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
45 changes: 45 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ impl DB {
.map_err(|err| err.into())
}

/// Reads a single record by key asynchronously.
pub async fn get_async<S: Schema>(
&self,
schema_key: &impl KeyCodec<S>,
) -> anyhow::Result<Option<S::Value>> {
tokio::task::block_in_place(|| self.get(schema_key))
}

/// Writes single record.
pub fn put<S: Schema>(
&self,
Expand All @@ -155,6 +163,15 @@ impl DB {
self.write_schemas(&batch)
}

/// Writes a single record asynchronously.
pub async fn put_async<S: Schema>(
&self,
key: &impl KeyCodec<S>,
value: &impl ValueCodec<S>,
) -> anyhow::Result<()> {
tokio::task::block_in_place(|| self.put(key, value))
}

/// Delete a single key from the database.
pub fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
// Not necessary to use a batch, but we'd like a central place to bump counters.
Expand All @@ -164,6 +181,11 @@ impl DB {
self.write_schemas(&batch)
}

/// Delete a single key from the database asynchronously.
pub async fn delete_async<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
tokio::task::block_in_place(|| self.delete(key))
}

/// Removes the database entries in the range `["from", "to")` using default write options.
///
/// Note that this operation will be done lexicographic on the *encoding* of the seek keys. It is
Expand All @@ -181,6 +203,19 @@ impl DB {
Ok(())
}

/// Removes the database entries in the range `["from", "to")` using default write options asynchronously.
///
/// Note that this operation will be done lexicographic on the *encoding* of the seek keys. It is
/// up to the table creator to ensure that the lexicographic ordering of the encoded seek keys matches the
/// logical ordering of the type.
pub async fn delete_range_async<S: Schema>(
&self,
from: &impl SeekKeyEncoder<S>,
to: &impl SeekKeyEncoder<S>,
) -> anyhow::Result<()> {
tokio::task::block_in_place(|| self.delete_range(from, to))
}

fn iter_with_direction<S: Schema>(
&self,
opts: ReadOptions,
Expand Down Expand Up @@ -269,6 +304,11 @@ impl DB {
Ok(())
}

/// Writes a group of records wrapped in a [`SchemaBatch`] asynchronously.
pub async fn write_schemas_async(&self, batch: &SchemaBatch) -> anyhow::Result<()> {
tokio::task::block_in_place(|| self.write_schemas(batch))
}

fn get_cf_handle(&self, cf_name: &str) -> anyhow::Result<&rocksdb::ColumnFamily> {
self.inner.cf_handle(cf_name).ok_or_else(|| {
format_err!(
Expand Down Expand Up @@ -303,6 +343,11 @@ impl DB {
rocksdb::checkpoint::Checkpoint::new(&self.inner)?.create_checkpoint(path)?;
Ok(())
}

/// Creates new physical DB checkpoint in directory specified by `path` asynchronously.
pub async fn create_checkpoint_async<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
tokio::task::block_in_place(|| self.create_checkpoint(path))
}
}

/// Readability alias for a key in the DB.
Expand Down
63 changes: 63 additions & 0 deletions tests/db_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,66 @@ fn test_checkpoint() {
assert_eq!(db.get::<TestSchema1>(&TestField(1)).unwrap(), None);
}
}

mod async_tests {
use super::*;

#[tokio::test(flavor = "multi_thread")]
async fn test_async_ops() {
let tmpdir = tempfile::tempdir().unwrap();
let db = open_db(tmpdir);

db.put_async::<TestSchema1>(&TestField(1), &TestField(55))
.await
.unwrap();
assert_eq!(
db.get_async::<TestSchema1>(&TestField(1)).await.unwrap(),
Some(TestField(55)),
);

db.delete_async::<TestSchema1>(&TestField(1)).await.unwrap();
assert!(db
.get_async::<TestSchema1>(&TestField(1))
.await
.unwrap()
.is_none());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_checkpoint() {
let tmpdir = tempfile::tempdir().unwrap();
let checkpoint_parent = tempfile::tempdir().unwrap();
let checkpoint = checkpoint_parent.path().join("checkpoint");
{
let db = open_db(&tmpdir);
db.put_async::<TestSchema1>(&TestField(0), &TestField(0))
.await
.unwrap();
db.create_checkpoint_async(&checkpoint).await.unwrap();
}
{
let db = open_db(&tmpdir);
assert_eq!(
db.get_async::<TestSchema1>(&TestField(0)).await.unwrap(),
Some(TestField(0)),
);

let cp = open_db(&checkpoint);
assert_eq!(
cp.get_async::<TestSchema1>(&TestField(0)).await.unwrap(),
Some(TestField(0)),
);
cp.put_async::<TestSchema1>(&TestField(1), &TestField(1))
.await
.unwrap();
assert_eq!(
cp.get_async::<TestSchema1>(&TestField(1)).await.unwrap(),
Some(TestField(1)),
);
assert_eq!(
db.get_async::<TestSchema1>(&TestField(1)).await.unwrap(),
None
);
}
}
}

0 comments on commit b990385

Please sign in to comment.