diff --git a/src/lib.rs b/src/lib.rs index 65e6220..cbe93da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -142,6 +142,14 @@ impl DB { .map_err(|err| err.into()) } + /// Reads a single record by key asynchronously. + pub async fn get_async( + &self, + schema_key: &impl KeyCodec, + ) -> anyhow::Result> { + tokio::task::block_in_place(|| self.get(schema_key)) + } + /// Writes single record. pub fn put( &self, @@ -155,6 +163,15 @@ impl DB { self.write_schemas(&batch) } + /// Writes a single record asynchronously. + pub async fn put_async( + &self, + key: &impl KeyCodec, + value: &impl ValueCodec, + ) -> anyhow::Result<()> { + tokio::task::block_in_place(|| self.put(key, value)) + } + /// Delete a single key from the database. pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { // Not necessary to use a batch, but we'd like a central place to bump counters. @@ -164,6 +181,11 @@ impl DB { self.write_schemas(&batch) } + /// Delete a single key from the database asynchronously. + pub async fn delete_async(&self, key: &impl KeyCodec) -> anyhow::Result<()> { + tokio::task::block_in_place(|| self.delete(key)) + } + /// Removes the database entries in the range `["from", "to")` using default write options. /// /// Note that this operation will be done lexicographic on the *encoding* of the seek keys. It is @@ -181,6 +203,19 @@ impl DB { Ok(()) } + /// Removes the database entries in the range `["from", "to")` using default write options asynchronously. + /// + /// Note that this operation will be done lexicographic on the *encoding* of the seek keys. It is + /// up to the table creator to ensure that the lexicographic ordering of the encoded seek keys matches the + /// logical ordering of the type. + pub async fn delete_range_async( + &self, + from: &impl SeekKeyEncoder, + to: &impl SeekKeyEncoder, + ) -> anyhow::Result<()> { + tokio::task::block_in_place(|| self.delete_range(from, to)) + } + fn iter_with_direction( &self, opts: ReadOptions, @@ -269,6 +304,11 @@ impl DB { Ok(()) } + /// Writes a group of records wrapped in a [`SchemaBatch`] asynchronously. + pub async fn write_schemas_async(&self, batch: &SchemaBatch) -> anyhow::Result<()> { + tokio::task::block_in_place(|| self.write_schemas(batch)) + } + fn get_cf_handle(&self, cf_name: &str) -> anyhow::Result<&rocksdb::ColumnFamily> { self.inner.cf_handle(cf_name).ok_or_else(|| { format_err!( @@ -303,6 +343,11 @@ impl DB { rocksdb::checkpoint::Checkpoint::new(&self.inner)?.create_checkpoint(path)?; Ok(()) } + + /// Creates new physical DB checkpoint in directory specified by `path` asynchronously. + pub async fn create_checkpoint_async>(&self, path: P) -> anyhow::Result<()> { + tokio::task::block_in_place(|| self.create_checkpoint(path)) + } } /// Readability alias for a key in the DB. diff --git a/tests/db_test.rs b/tests/db_test.rs index fdeb33c..1c54ead 100644 --- a/tests/db_test.rs +++ b/tests/db_test.rs @@ -345,3 +345,66 @@ fn test_checkpoint() { assert_eq!(db.get::(&TestField(1)).unwrap(), None); } } + +mod async_tests { + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn test_async_ops() { + let tmpdir = tempfile::tempdir().unwrap(); + let db = open_db(tmpdir); + + db.put_async::(&TestField(1), &TestField(55)) + .await + .unwrap(); + assert_eq!( + db.get_async::(&TestField(1)).await.unwrap(), + Some(TestField(55)), + ); + + db.delete_async::(&TestField(1)).await.unwrap(); + assert!(db + .get_async::(&TestField(1)) + .await + .unwrap() + .is_none()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_checkpoint() { + let tmpdir = tempfile::tempdir().unwrap(); + let checkpoint_parent = tempfile::tempdir().unwrap(); + let checkpoint = checkpoint_parent.path().join("checkpoint"); + { + let db = open_db(&tmpdir); + db.put_async::(&TestField(0), &TestField(0)) + .await + .unwrap(); + db.create_checkpoint_async(&checkpoint).await.unwrap(); + } + { + let db = open_db(&tmpdir); + assert_eq!( + db.get_async::(&TestField(0)).await.unwrap(), + Some(TestField(0)), + ); + + let cp = open_db(&checkpoint); + assert_eq!( + cp.get_async::(&TestField(0)).await.unwrap(), + Some(TestField(0)), + ); + cp.put_async::(&TestField(1), &TestField(1)) + .await + .unwrap(); + assert_eq!( + cp.get_async::(&TestField(1)).await.unwrap(), + Some(TestField(1)), + ); + assert_eq!( + db.get_async::(&TestField(1)).await.unwrap(), + None + ); + } + } +}