Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement memory transaction #702

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions crates/engine/src/memory_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
type MemoryTable = HashMap<Vec<u8>, Vec<u8>>;

/// Memory Storage Engine Implementation
#[derive(Debug, Default)]
#[derive(Clone, Debug, Default)]
pub struct MemoryEngine {
/// The inner storage engine of `MemoryStorage`
inner: Arc<RwLock<HashMap<String, MemoryTable>>>,
Expand Down Expand Up @@ -62,7 +62,15 @@ impl StorageEngine for MemoryEngine {

#[inline]
fn transaction(&self) -> MemoryTransaction {
MemoryTransaction {}
let inner_r = self.inner.read();
let mut state = HashMap::new();
for table in inner_r.keys() {
let _ignore = state.insert(table.clone(), HashMap::new());
}
MemoryTransaction {
db: self.clone(),
state: RwLock::new(state),
}
}

#[inline]
Expand Down
146 changes: 141 additions & 5 deletions crates/engine/src/memory_engine/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,154 @@
#![allow(clippy::module_name_repetitions)]
#![allow(clippy::multiple_inherent_impl)]

use crate::TransactionApi;
use std::{cmp::Ordering, collections::HashMap};

use parking_lot::{RwLock, RwLockWriteGuard};

use crate::{
api::transaction_api::TransactionApi, error::EngineError, memory_engine::MemoryEngine,
StorageOps, WriteOperation,
};

/// The memory table used in transaction state
type StateMemoryTable = HashMap<Vec<u8>, Option<Vec<u8>>>;

/// A transaction of the `MemoryEngine`
#[derive(Copy, Clone, Debug, Default)]
pub struct MemoryTransaction;
#[derive(Debug, Default)]
pub struct MemoryTransaction {
/// The memory engine
pub(super) db: MemoryEngine,
/// The inner storage engine of `MemoryStorage`
pub(super) state: RwLock<HashMap<String, StateMemoryTable>>,
}

impl StorageOps for MemoryTransaction {
fn write(&self, op: WriteOperation<'_>, _sync: bool) -> Result<(), EngineError> {
let mut state_w = self.state.write();
self.write_op(&mut state_w, op)
}

fn write_multi(&self, ops: Vec<WriteOperation<'_>>, _sync: bool) -> Result<(), EngineError> {
let mut state_w = self.state.write();
for op in ops {
self.write_op(&mut state_w, op)?;
}
Ok(())
}

fn get(&self, table: &str, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>, EngineError> {
let state_r = self.state.read();
let state_table = state_r
.get(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;

if let Some(val) = state_table.get(key.as_ref()) {
return Ok(val.clone());
}

let db_inner_r = self.db.inner.read();
let db_table = db_inner_r
.get(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved

Ok(db_table.get(key.as_ref()).cloned())
}

fn get_multi(
&self,
table: &str,
keys: &[impl AsRef<[u8]>],
) -> Result<Vec<Option<Vec<u8>>>, EngineError> {
keys.iter().map(|key| self.get(table, key)).collect()
}
}

impl TransactionApi for MemoryTransaction {
fn commit(self) -> Result<(), crate::EngineError> {
fn commit(self) -> Result<(), EngineError> {
let mut state_w = self.state.write();
let mut db_inner_w = self.db.inner.write();
for (name, mut table) in state_w.drain() {
let db_table = db_inner_w
.get_mut(&name)
.ok_or_else(|| EngineError::TableNotFound(name.clone()))?;

for (key, val_opt) in table.drain() {
if let Some(val) = val_opt {
let _ignore = db_table.insert(key, val);
} else {
let _ignore = db_table.remove(&key);
}
}
}

Ok(())
}

fn rollback(&self) -> Result<(), crate::EngineError> {
fn rollback(&self) -> Result<(), EngineError> {
let mut state_w = self.state.write();
for table in state_w.values_mut() {
table.clear();
}

Ok(())
}
}

impl MemoryTransaction {
/// Write an op to the transaction
fn write_op(
&self,
state_w: &mut RwLockWriteGuard<'_, HashMap<String, StateMemoryTable>>,
op: WriteOperation<'_>,
) -> Result<(), EngineError> {
match op {
WriteOperation::Put { table, key, value } => {
let table = state_w
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
let _ignore = table.insert(key, Some(value));
}
WriteOperation::Delete { table, key } => {
let table = state_w
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
let _ignore = table.insert(key.to_vec(), None);
}
WriteOperation::DeleteRange { table, from, to } => {
let db_inner_r = self.db.inner.read();
let db_table = db_inner_r
.get(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
let state_table = state_w
.get(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;

let to_delete: Vec<_> = db_table
.keys()
.chain(state_table.keys())
.filter(|key| {
let key_slice = key.as_slice();
match key_slice.cmp(from) {
Ordering::Less => false,
Ordering::Equal => true,
Ordering::Greater => match key_slice.cmp(to) {
Ordering::Less => true,
Ordering::Equal | Ordering::Greater => false,
},
}
})
.cloned()
.collect();

let table = state_w
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
// `None` works as a tombstone of the key
for key in to_delete {
let _ignore = table.insert(key.clone(), None);
}
}
}
Ok(())
}
}
Loading