From 7e010f69365a238279c3353d36394a7a46a58247 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 30 Apr 2024 15:06:22 +0800 Subject: [PATCH] refactor: remove unsafe code in rocksdb Use rocksdb txn write for all write operations, this may slightly impact performance Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/engine/src/api/engine_api.rs | 4 +- crates/engine/src/api/operation.rs | 1 + crates/engine/src/memory_engine/mod.rs | 2 +- crates/engine/src/metrics.rs | 4 +- crates/engine/src/proxy.rs | 10 +- crates/engine/src/rocksdb_engine/mod.rs | 7 +- .../engine/src/rocksdb_engine/transaction.rs | 327 +++++------------- 7 files changed, 94 insertions(+), 261 deletions(-) diff --git a/crates/engine/src/api/engine_api.rs b/crates/engine/src/api/engine_api.rs index 626377f65..6260ae4a9 100644 --- a/crates/engine/src/api/engine_api.rs +++ b/crates/engine/src/api/engine_api.rs @@ -8,10 +8,10 @@ pub trait StorageEngine: Send + Sync + 'static + std::fmt::Debug { /// The snapshot type type Snapshot: SnapshotApi; /// The transaction type - type Transaction: TransactionApi; + type Transaction<'db>: TransactionApi; /// Creates a transaction - fn transaction(&self) -> Self::Transaction; + fn transaction(&self) -> Self::Transaction<'_>; /// Get the value associated with a key value and the given table /// diff --git a/crates/engine/src/api/operation.rs b/crates/engine/src/api/operation.rs index d351c9990..70a636ff7 100644 --- a/crates/engine/src/api/operation.rs +++ b/crates/engine/src/api/operation.rs @@ -1,6 +1,7 @@ use crate::EngineError; /// Storage operations +/// TODO: refactor this trait, require `&mut self` for write operations pub trait StorageOps { /// Write an op to the transaction /// diff --git a/crates/engine/src/memory_engine/mod.rs b/crates/engine/src/memory_engine/mod.rs index 7260e9681..c68a3785f 100644 --- a/crates/engine/src/memory_engine/mod.rs +++ b/crates/engine/src/memory_engine/mod.rs @@ -58,7 +58,7 @@ impl MemoryEngine { #[async_trait::async_trait] impl StorageEngine for MemoryEngine { type Snapshot = MemorySnapshot; - type Transaction = MemoryTransaction; + type Transaction<'db> = MemoryTransaction; #[inline] fn transaction(&self) -> MemoryTransaction { diff --git a/crates/engine/src/metrics.rs b/crates/engine/src/metrics.rs index f931d6ed6..9a2b72aee 100644 --- a/crates/engine/src/metrics.rs +++ b/crates/engine/src/metrics.rs @@ -60,10 +60,10 @@ where { /// The snapshot type type Snapshot = Layer; - type Transaction = Layer; + type Transaction<'db> = Layer>; /// Creates a transaction - fn transaction(&self) -> Self::Transaction { + fn transaction(&self) -> Self::Transaction<'_> { Layer::new(self.engine.transaction()) } diff --git a/crates/engine/src/proxy.rs b/crates/engine/src/proxy.rs index fd825a253..2fb74ffdc 100644 --- a/crates/engine/src/proxy.rs +++ b/crates/engine/src/proxy.rs @@ -68,10 +68,10 @@ impl Engine { #[async_trait::async_trait] impl StorageEngine for Engine { type Snapshot = Snapshot; - type Transaction = Transaction; + type Transaction<'db> = Transaction<'db>; #[inline] - fn transaction(&self) -> Transaction { + fn transaction(&self) -> Transaction<'_> { match *self { Engine::Memory(ref e) => Transaction::Memory(e.transaction()), Engine::Rocks(ref e) => Transaction::Rocks(e.transaction()), @@ -166,14 +166,14 @@ impl StorageEngine for Engine { /// NOTE: Currently multiple concurrent transactions is not supported #[derive(Debug)] #[non_exhaustive] -pub enum Transaction { +pub enum Transaction<'a> { /// Memory transaction Memory(MemoryTransaction), /// Rocks transaction - Rocks(metrics::Layer), + Rocks(metrics::Layer>), } -impl TransactionApi for Transaction { +impl TransactionApi for Transaction<'_> { #[inline] fn commit(self) -> Result<(), EngineError> { match self { diff --git a/crates/engine/src/rocksdb_engine/mod.rs b/crates/engine/src/rocksdb_engine/mod.rs index 7878f7500..702189508 100644 --- a/crates/engine/src/rocksdb_engine/mod.rs +++ b/crates/engine/src/rocksdb_engine/mod.rs @@ -151,11 +151,12 @@ impl RocksEngine { #[async_trait::async_trait] impl StorageEngine for RocksEngine { type Snapshot = RocksSnapshot; - type Transaction = RocksTransaction; + type Transaction<'db> = RocksTransaction<'db>; #[inline] - fn transaction(&self) -> RocksTransaction { - RocksTransaction::new(Arc::clone(&self.inner), Arc::clone(&self.size)) + fn transaction(&self) -> RocksTransaction<'_> { + let txn = self.inner.transaction(); + RocksTransaction::new(Arc::clone(&self.inner), txn, Arc::clone(&self.size)) } #[inline] diff --git a/crates/engine/src/rocksdb_engine/transaction.rs b/crates/engine/src/rocksdb_engine/transaction.rs index 6e97e6807..b5a0f6061 100644 --- a/crates/engine/src/rocksdb_engine/transaction.rs +++ b/crates/engine/src/rocksdb_engine/transaction.rs @@ -1,44 +1,33 @@ #![allow(clippy::module_name_repetitions)] -#![allow(clippy::multiple_inherent_impl)] use std::{ iter::repeat, - sync::{atomic::AtomicU64, Arc}, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, + }, }; -use clippy_utilities::{NumericCast, OverflowArithmetic}; +use clippy_utilities::NumericCast; use parking_lot::Mutex; -use rocksdb::{ - Direction, IteratorMode, OptimisticTransactionDB, Transaction, WriteBatchWithTransaction, - WriteOptions, -}; +use rocksdb::{Direction, IteratorMode, OptimisticTransactionDB, Transaction}; -use crate::{ - api::transaction_api::TransactionApi, error::EngineError, rocksdb_engine::RocksEngine, - StorageOps, WriteOperation, -}; +use crate::{api::transaction_api::TransactionApi, error::EngineError, StorageOps, WriteOperation}; -/// Transaction type for `RocksDB` -#[derive(Debug)] -pub struct RocksTransaction { - /// Inner state - inner: Mutex>, -} +use super::RocksEngine; -/// Inner state of the transaction -/// -/// WARN: `db` should never be dropped before `txn` -struct Inner { +/// Transaction type for `RocksDB` +pub struct RocksTransaction<'db> { /// The inner DB db: Arc, /// A transaction of the DB - txn: Option>, - /// Cached write operations - write_ops: Vec, + /// + /// We need a `Mutex` because `Transaction<'db, DB>` does not implement `Sync` + txn: Mutex>>, /// The size of the engine engine_size: Arc, /// The size of the txn - txn_size: usize, + txn_size: AtomicUsize, } /// Write operation @@ -73,148 +62,53 @@ enum WriteOperationOwned { }, } -impl RocksTransaction { +impl<'db> RocksTransaction<'db> { /// Creates a new `RocksTransaction` - pub(super) fn new(db: Arc, engine_size: Arc) -> Self { - let inner = Inner { + pub(super) fn new( + db: Arc, + txn: Transaction<'db, OptimisticTransactionDB>, + engine_size: Arc, + ) -> Self { + Self { db, - txn: None, - write_ops: vec![], + txn: Mutex::new(Some(txn)), engine_size, - txn_size: 0, - }; - Self { - inner: Mutex::new(Some(inner)), + txn_size: AtomicUsize::new(0), } } } -#[allow(clippy::unwrap_used)] -#[allow(clippy::unwrap_in_result)] -impl StorageOps for RocksTransaction { - fn write(&self, op: WriteOperation<'_>, sync: bool) -> Result<(), EngineError> { - self.inner.lock().as_mut().unwrap().write(op, sync) - } - - fn write_multi(&self, ops: Vec>, sync: bool) -> Result<(), EngineError> { - self.inner.lock().as_mut().unwrap().write_multi(ops, sync) - } - - fn get(&self, table: &str, key: impl AsRef<[u8]>) -> Result>, EngineError> { - self.inner.lock().as_mut().unwrap().get(table, key) - } - - fn get_multi( - &self, - table: &str, - keys: &[impl AsRef<[u8]>], - ) -> Result>>, EngineError> { - self.inner.lock().as_mut().unwrap().get_multi(table, keys) - } -} - -#[allow(clippy::unwrap_used)] -#[allow(clippy::unwrap_in_result)] -impl TransactionApi for RocksTransaction { - fn commit(self) -> Result<(), EngineError> { - self.inner.lock().take().unwrap().commit() - } - - fn rollback(&self) -> Result<(), EngineError> { - self.inner.lock().as_mut().unwrap().rollback() - } -} - -impl Inner { - /// Replace txn with a new transaction - #[allow(unsafe_code)] - #[allow(clippy::unwrap_used)] - fn enable_transaction(&mut self) -> Result<(), EngineError> { - if self.txn.is_some() { - return Ok(()); - } - - let txn = self.db.transaction(); - let txn_static = - // SAFETY: In `RocksTransaction` we hold an Arc reference to the DB, - // so a `Transaction<'db, DB>` won't outlive the lifetime of the DB. - unsafe { std::mem::transmute::<_, Transaction<'static, OptimisticTransactionDB>>(txn) }; - - for op in self.write_ops.drain(..).collect::>() { - self.txn_write_op(op, &txn_static)?; - } - - self.txn = Some(txn_static); - - Ok(()) - } - - #[allow(clippy::pattern_type_mismatch)] - /// Batch write operation - fn batch_write_op( - &self, - op: WriteOperationOwned, - batch: &mut WriteBatchWithTransaction, - ) -> Result<(), EngineError> { - match op { - WriteOperationOwned::Put { table, key, value } => { - let cf = self - .db - .cf_handle(&table) - .ok_or(EngineError::TableNotFound(table.clone()))?; - batch.put_cf(&cf, key, value); - } - WriteOperationOwned::Delete { table, key } => { - let cf = self - .db - .cf_handle(&table) - .ok_or(EngineError::TableNotFound(table.clone()))?; - batch.delete_cf(&cf, key); - } - WriteOperationOwned::DeleteRange { table, from, to } => { - let cf = self - .db - .cf_handle(table.as_ref()) - .ok_or_else(|| EngineError::TableNotFound(table.clone()))?; - let mode = IteratorMode::From(&from, Direction::Forward); - let kvs: Vec<_> = self - .db - .iterator_cf(&cf, mode) - .take_while(|res| { - res.as_ref() - .is_ok_and(|(key, _)| key.as_ref() < to.as_slice()) - }) - .collect::, _>>()?; - for (key, _) in kvs { - batch.delete_cf(&cf, key); - } - } - } - - Ok(()) - } - - #[allow(clippy::pattern_type_mismatch)] - /// Applies write ops to txn - fn txn_write_op( - &self, - op: WriteOperationOwned, - txn: &Transaction<'_, OptimisticTransactionDB>, - ) -> Result<(), EngineError> { - match op { +#[allow(clippy::unwrap_used, clippy::unwrap_in_result)] // txn is always `Some` +impl StorageOps for RocksTransaction<'_> { + fn write(&self, op: WriteOperation<'_>, _sync: bool) -> Result<(), EngineError> { + match op.into() { WriteOperationOwned::Put { table, key, value } => { let cf = self .db .cf_handle(table.as_ref()) .ok_or_else(|| EngineError::TableNotFound(table.clone()))?; - txn.put_cf(&cf, key, value).map_err(EngineError::from)?; + self.txn + .lock() + .as_ref() + .unwrap() + .put_cf(&cf, &key, &value) + .map_err(EngineError::from)?; + let _ignore = self.txn_size.fetch_add( + RocksEngine::max_write_size(table.len(), key.len(), value.len()), + Ordering::Relaxed, + ); } WriteOperationOwned::Delete { table, key } => { let cf = self .db .cf_handle(table.as_ref()) .ok_or_else(|| EngineError::TableNotFound(table.clone()))?; - txn.delete_cf(&cf, key).map_err(EngineError::from)?; + self.txn + .lock() + .as_ref() + .unwrap() + .delete_cf(&cf, key) + .map_err(EngineError::from)?; } WriteOperationOwned::DeleteRange { table, from, to } => { let cf = self @@ -222,7 +116,12 @@ impl Inner { .cf_handle(table.as_ref()) .ok_or_else(|| EngineError::TableNotFound(table.clone()))?; let mode = IteratorMode::From(&from, Direction::Forward); - let kvs: Vec<_> = txn + #[allow(clippy::pattern_type_mismatch)] // can't be fixed + let kvs: Vec<_> = self + .txn + .lock() + .as_ref() + .unwrap() .iterator_cf(&cf, mode) .take_while(|res| { res.as_ref() @@ -230,147 +129,79 @@ impl Inner { }) .collect::, _>>()?; for (key, _) in kvs { - txn.delete_cf(&cf, key)?; + self.txn.lock().as_ref().unwrap().delete_cf(&cf, key)?; } } } Ok(()) } -} -#[allow(clippy::unwrap_used)] -#[allow(clippy::unwrap_in_result)] -impl Inner { - /// Write an op to the transaction - /// - /// # Errors - /// - /// if error occurs in storage, return `Err(error)` - fn write(&mut self, op: WriteOperation<'_>, _sync: bool) -> Result<(), EngineError> { - if let Some(ref txn) = self.txn { - return self.txn_write_op(op.into(), txn); - } - #[allow(clippy::pattern_type_mismatch)] // can't be fixed - match op { - WriteOperation::Put { - table, - ref key, - ref value, - } => { - self.txn_size = self.txn_size.overflow_add(RocksEngine::max_write_size( - table.len(), - key.len(), - value.len(), - )); - } - WriteOperation::Delete { .. } | WriteOperation::DeleteRange { .. } => {} - }; - - self.write_ops.push(op.into()); - - Ok(()) - } - - /// Commit a batch of write operations - /// If sync is true, the write will be flushed from the operating system - /// buffer cache before the write is considered complete. If this - /// flag is true, writes will be slower. - /// - /// # Errors - /// Return `EngineError::TableNotFound` if the given table does not exist - /// Return `EngineError` if met some errors - fn write_multi(&mut self, ops: Vec>, sync: bool) -> Result<(), EngineError> { + fn write_multi(&self, ops: Vec>, sync: bool) -> Result<(), EngineError> { for op in ops { self.write(op, sync)?; } Ok(()) } - /// Get the value associated with a key value and the given table - /// - /// # Errors - /// Return `EngineError::TableNotFound` if the given table does not exist - /// Return `EngineError` if met some errors - fn get(&mut self, table: &str, key: impl AsRef<[u8]>) -> Result>, EngineError> { - self.enable_transaction()?; + fn get(&self, table: &str, key: impl AsRef<[u8]>) -> Result>, EngineError> { let cf = self .db .cf_handle(table.as_ref()) .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; - let txn = self.txn.as_ref().unwrap(); - txn.get_cf(&cf, key).map_err(EngineError::from) + self.txn + .lock() + .as_ref() + .unwrap() + .get_cf(&cf, key) + .map_err(EngineError::from) } - /// Get the values associated with the given keys - /// - /// # Errors - /// Return `EngineError::TableNotFound` if the given table does not exist - /// Return `EngineError` if met some errors fn get_multi( - &mut self, + &self, table: &str, keys: &[impl AsRef<[u8]>], ) -> Result>>, EngineError> { - self.enable_transaction()?; - let txn = self.txn.as_ref().unwrap(); let cf = self .db .cf_handle(table.as_ref()) .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; - txn.multi_get_cf(repeat(&cf).zip(keys.iter())) + self.txn + .lock() + .as_ref() + .unwrap() + .multi_get_cf(repeat(&cf).zip(keys.iter())) .into_iter() .collect::>() .map_err(EngineError::from) } +} - /// Commits the changes - /// - /// # Errors - /// - /// if error occurs in storage, return `Err(error)` - fn commit(mut self) -> Result<(), EngineError> { +#[allow(clippy::unwrap_used, clippy::unwrap_in_result)] // txn is always `Some` +impl TransactionApi for RocksTransaction<'_> { + fn commit(self) -> Result<(), EngineError> { let _ignore = self.engine_size.fetch_add( - self.txn_size.numeric_cast(), - std::sync::atomic::Ordering::Relaxed, + self.txn_size.load(Ordering::Relaxed).numeric_cast(), + Ordering::Relaxed, ); - if let Some(txn) = self.txn { - return txn.commit().map_err(Into::into); - } - - let mut batch = WriteBatchWithTransaction::::default(); - for op in self.write_ops.drain(..).collect::>() { - self.batch_write_op(op, &mut batch)?; - } - self.db.write_opt(batch, &WriteOptions::default())?; - - Ok(()) + self.txn.lock().take().unwrap().commit().map_err(Into::into) } - /// Rollbacks the changes - /// - /// # Errors - /// - /// if error occurs in storage, return `Err(error)` - fn rollback(&mut self) -> Result<(), EngineError> { - if let Some(ref txn) = self.txn { - txn.rollback()?; - self.txn = None; - } else { - self.write_ops.clear(); - } - - Ok(()) + fn rollback(&self) -> Result<(), EngineError> { + self.txn + .lock() + .as_ref() + .unwrap() + .rollback() + .map_err(Into::into) } } -#[allow(clippy::missing_fields_in_debug)] // `Transaction` does not implement `Debug` -impl std::fmt::Debug for Inner { +impl std::fmt::Debug for RocksTransaction<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RocksTransaction") .field("db", &self.db) - .field("write_ops", &self.write_ops) .field("engine_size", &self.engine_size) .field("txn_size", &self.txn_size) .finish()