From 677fa2fb325e1adbd7c72405aff9ebdaf1101add Mon Sep 17 00:00:00 2001 From: Ross Weir <29697678+ross-weir@users.noreply.github.com> Date: Tue, 21 May 2024 23:26:35 +1000 Subject: [PATCH] Add `async` operations to DBs (#8) * add tokio dependency * add macros tokio feature for tests * add async methods to cache db * add async methods to base rocksdb wrapper --- Cargo.toml | 1 + src/cache/cache_db.rs | 38 ++++++++++++++++++++++++++ src/lib.rs | 45 +++++++++++++++++++++++++++++++ tests/db_test.rs | 63 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 147 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index b0692ec..d6225ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ proptest-derive = { version = "0.4", optional = true } rocksdb = { version = "0.21" } thiserror = "1" tracing = "0.1" +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } [dev-dependencies] rockbound = { path = ".", features = ["test-utils"] } diff --git a/src/cache/cache_db.rs b/src/cache/cache_db.rs index 67a5a60..fb03d81 100644 --- a/src/cache/cache_db.rs +++ b/src/cache/cache_db.rs @@ -100,6 +100,14 @@ impl CacheDb { parent.get::(local_cache.id(), key) } + /// Get a value from current snapshot, its parents or underlying database + pub async fn read_async( + &self, + key: &impl KeyCodec, + ) -> anyhow::Result> { + tokio::task::block_in_place(|| self.read(key)) + } + /// Get value of largest key written value for given [`Schema`] pub fn get_largest(&self) -> anyhow::Result> { let change_set = self @@ -127,6 +135,11 @@ impl CacheDb { Ok(None) } + /// Get value of largest key written value for given [`Schema`] + pub async fn get_largest_async(&self) -> anyhow::Result> { + tokio::task::block_in_place(|| self.get_largest::()) + } + /// Get largest value in [`Schema`] that is smaller than give `seek_key` pub fn get_prev( &self, @@ -160,6 +173,14 @@ impl CacheDb { Ok(None) } + /// Get largest value in [`Schema`] that is smaller than give `seek_key` + pub async fn get_prev_async( + &self, + seek_key: &impl SeekKeyEncoder, + ) -> anyhow::Result> { + tokio::task::block_in_place(|| self.get_prev(seek_key)) + } + /// Get `n` keys >= `seek_key` pub fn get_n_from_first_match( &self, @@ -206,6 +227,15 @@ impl CacheDb { }) } + /// Get `n` keys >= `seek_key` + pub async fn get_n_from_first_match_async( + &self, + seek_key: &impl SeekKeyEncoder, + n: usize, + ) -> anyhow::Result> { + tokio::task::block_in_place(|| self.get_n_from_first_match(seek_key, n)) + } + /// Get a clone of internal ChangeSet pub fn clone_change_set(&self) -> ChangeSet { let change_set = self @@ -251,6 +281,14 @@ impl CacheDb { Ok(result) } + + /// Collects all key-value pairs in given range, from smallest to largest. + pub async fn collect_in_range_async>( + &self, + range: std::ops::Range, + ) -> anyhow::Result> { + tokio::task::block_in_place(|| self.collect_in_range(range)) + } } /// Iterator over [`CacheDb`] that combines local cache and parent iterators. diff --git a/src/lib.rs b/src/lib.rs index 65e6220..cbe93da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -142,6 +142,14 @@ impl DB { .map_err(|err| err.into()) } + /// Reads a single record by key asynchronously. + pub async fn get_async( + &self, + schema_key: &impl KeyCodec, + ) -> anyhow::Result> { + tokio::task::block_in_place(|| self.get(schema_key)) + } + /// Writes single record. pub fn put( &self, @@ -155,6 +163,15 @@ impl DB { self.write_schemas(&batch) } + /// Writes a single record asynchronously. + pub async fn put_async( + &self, + key: &impl KeyCodec, + value: &impl ValueCodec, + ) -> anyhow::Result<()> { + tokio::task::block_in_place(|| self.put(key, value)) + } + /// Delete a single key from the database. pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { // Not necessary to use a batch, but we'd like a central place to bump counters. @@ -164,6 +181,11 @@ impl DB { self.write_schemas(&batch) } + /// Delete a single key from the database asynchronously. + pub async fn delete_async(&self, key: &impl KeyCodec) -> anyhow::Result<()> { + tokio::task::block_in_place(|| self.delete(key)) + } + /// Removes the database entries in the range `["from", "to")` using default write options. /// /// Note that this operation will be done lexicographic on the *encoding* of the seek keys. It is @@ -181,6 +203,19 @@ impl DB { Ok(()) } + /// Removes the database entries in the range `["from", "to")` using default write options asynchronously. + /// + /// Note that this operation will be done lexicographic on the *encoding* of the seek keys. It is + /// up to the table creator to ensure that the lexicographic ordering of the encoded seek keys matches the + /// logical ordering of the type. + pub async fn delete_range_async( + &self, + from: &impl SeekKeyEncoder, + to: &impl SeekKeyEncoder, + ) -> anyhow::Result<()> { + tokio::task::block_in_place(|| self.delete_range(from, to)) + } + fn iter_with_direction( &self, opts: ReadOptions, @@ -269,6 +304,11 @@ impl DB { Ok(()) } + /// Writes a group of records wrapped in a [`SchemaBatch`] asynchronously. + pub async fn write_schemas_async(&self, batch: &SchemaBatch) -> anyhow::Result<()> { + tokio::task::block_in_place(|| self.write_schemas(batch)) + } + fn get_cf_handle(&self, cf_name: &str) -> anyhow::Result<&rocksdb::ColumnFamily> { self.inner.cf_handle(cf_name).ok_or_else(|| { format_err!( @@ -303,6 +343,11 @@ impl DB { rocksdb::checkpoint::Checkpoint::new(&self.inner)?.create_checkpoint(path)?; Ok(()) } + + /// Creates new physical DB checkpoint in directory specified by `path` asynchronously. + pub async fn create_checkpoint_async>(&self, path: P) -> anyhow::Result<()> { + tokio::task::block_in_place(|| self.create_checkpoint(path)) + } } /// Readability alias for a key in the DB. diff --git a/tests/db_test.rs b/tests/db_test.rs index fdeb33c..1c54ead 100644 --- a/tests/db_test.rs +++ b/tests/db_test.rs @@ -345,3 +345,66 @@ fn test_checkpoint() { assert_eq!(db.get::(&TestField(1)).unwrap(), None); } } + +mod async_tests { + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn test_async_ops() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(tmpdir); + + db.put_async::(&TestField(1), &TestField(55)) + .await + .unwrap(); + assert_eq!( + db.get_async::(&TestField(1)).await.unwrap(), + Some(TestField(55)), + ); + + db.delete_async::(&TestField(1)).await.unwrap(); + assert!(db + .get_async::(&TestField(1)) + .await + .unwrap() + .is_none()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_checkpoint() { + let tmpdir = tempfile::tempdir().unwrap(); + let checkpoint_parent = tempfile::tempdir().unwrap(); + let checkpoint = checkpoint_parent.path().join("checkpoint"); + { + let db = open_db(&tmpdir); + db.put_async::(&TestField(0), &TestField(0)) + .await + .unwrap(); + db.create_checkpoint_async(&checkpoint).await.unwrap(); + } + { + let db = open_db(&tmpdir); + assert_eq!( + db.get_async::(&TestField(0)).await.unwrap(), + Some(TestField(0)), + ); + + let cp = open_db(&checkpoint); + assert_eq!( + cp.get_async::(&TestField(0)).await.unwrap(), + Some(TestField(0)), + ); + cp.put_async::(&TestField(1), &TestField(1)) + .await + .unwrap(); + assert_eq!( + cp.get_async::(&TestField(1)).await.unwrap(), + Some(TestField(1)), + ); + assert_eq!( + db.get_async::(&TestField(1)).await.unwrap(), + None + ); + } + } +}