Skip to content

Commit

Permalink
refactor: engine
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds authored and Phoenix500526 committed Jul 9, 2024
1 parent 8ffa516 commit 232c9da
Show file tree
Hide file tree
Showing 12 changed files with 570 additions and 339 deletions.
14 changes: 8 additions & 6 deletions crates/curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use curp_external_api::{
cmd::{Command, CommandExecutor, ConflictCheck, PbCodec},
InflightId, LogIndex,
};
use engine::{Engine, EngineType, Snapshot, SnapshotApi, StorageEngine, WriteOperation};
use engine::{
Engine, EngineType, Snapshot, SnapshotApi, StorageEngine, StorageOps, WriteOperation,
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -249,7 +251,7 @@ impl CommandExecutor<TestCommand> for TestCE {
rev.to_le_bytes().to_vec(),
)];
self.store
.write_batch(wr_ops, true)
.write_multi(wr_ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
rev
} else {
Expand Down Expand Up @@ -342,7 +344,7 @@ impl CommandExecutor<TestCommand> for TestCE {
})),
);
self.store
.write_batch(wr_ops, true)
.write_multi(wr_ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
}
debug!(
Expand All @@ -359,7 +361,7 @@ impl CommandExecutor<TestCommand> for TestCE {
index.to_le_bytes().to_vec(),
)];
self.store
.write_batch(ops, true)
.write_multi(ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
Ok(())
}
Expand Down Expand Up @@ -392,7 +394,7 @@ impl CommandExecutor<TestCommand> for TestCE {
WriteOperation::new_delete(META_TABLE, APPLIED_INDEX_KEY.as_ref()),
];
self.store
.write_batch(ops, true)
.write_multi(ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
return Ok(());
};
Expand All @@ -402,7 +404,7 @@ impl CommandExecutor<TestCommand> for TestCE {
index.to_le_bytes().to_vec(),
)];
self.store
.write_batch(ops, true)
.write_multi(ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
snapshot.rewind().unwrap();
self.store
Expand Down
12 changes: 6 additions & 6 deletions crates/curp/src/server/storage/db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::marker::PhantomData;

use async_trait::async_trait;
use engine::{Engine, EngineType, StorageEngine, WriteOperation};
use engine::{Engine, EngineType, StorageEngine, StorageOps, WriteOperation};
use prost::Message;
use utils::config::EngineConfig;

Expand Down Expand Up @@ -45,7 +45,7 @@ impl<C: Command> StorageApi for DB<C> {
async fn flush_voted_for(&self, term: u64, voted_for: ServerId) -> Result<(), StorageError> {
let bytes = bincode::serialize(&(term, voted_for))?;
let op = WriteOperation::new_put(CF, VOTE_FOR.to_vec(), bytes);
self.db.write_batch(vec![op], true)?;
self.db.write_multi(vec![op], true)?;

Ok(())
}
Expand All @@ -54,7 +54,7 @@ impl<C: Command> StorageApi for DB<C> {
async fn put_log_entry(&self, entry: &LogEntry<Self::Command>) -> Result<(), StorageError> {
let bytes = bincode::serialize(entry)?;
let op = WriteOperation::new_put(LOGS_CF, entry.index.to_le_bytes().to_vec(), bytes);
self.db.write_batch(vec![op], false)?;
self.db.write_multi(vec![op], false)?;

Ok(())
}
Expand All @@ -64,15 +64,15 @@ impl<C: Command> StorageApi for DB<C> {
let id = member.id;
let data = member.encode_to_vec();
let op = WriteOperation::new_put(MEMBERS_CF, id.to_le_bytes().to_vec(), data);
self.db.write_batch(vec![op], true)?;
self.db.write_multi(vec![op], true)?;
Ok(())
}

#[inline]
fn remove_member(&self, id: ServerId) -> Result<(), StorageError> {
let id_bytes = id.to_le_bytes();
let op = WriteOperation::new_delete(MEMBERS_CF, &id_bytes);
self.db.write_batch(vec![op], true)?;
self.db.write_multi(vec![op], true)?;
Ok(())
}

Expand All @@ -96,7 +96,7 @@ impl<C: Command> StorageApi for DB<C> {
m.encode_to_vec(),
));
}
self.db.write_batch(ops, true)?;
self.db.write_multi(ops, true)?;
Ok(())
}

Expand Down
30 changes: 1 addition & 29 deletions crates/engine/src/api/engine_api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::Path;

use crate::{api::snapshot_api::SnapshotApi, error::EngineError, TransactionApi, WriteOperation};
use crate::{api::snapshot_api::SnapshotApi, error::EngineError, TransactionApi};

/// The `StorageEngine` trait
#[async_trait::async_trait]
Expand All @@ -13,41 +13,13 @@ pub trait StorageEngine: Send + Sync + 'static + std::fmt::Debug {
/// Creates a transaction
fn transaction(&self) -> Self::Transaction<'_>;

/// Get the value associated with a key value and the given table
///
/// # Errors
/// Return `EngineError::TableNotFound` if the given table does not exist
/// Return `EngineError` if met some errors
fn get(&self, table: &str, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>, EngineError>;

/// Get the values associated with the given keys
///
/// # Errors
/// Return `EngineError::TableNotFound` if the given table does not exist
/// Return `EngineError` if met some errors
fn get_multi(
&self,
table: &str,
keys: &[impl AsRef<[u8]>],
) -> Result<Vec<Option<Vec<u8>>>, EngineError>;

/// Get all the values of the given table
/// # Errors
/// Return `EngineError::TableNotFound` if the given table does not exist
/// Return `EngineError` if met some errors
#[allow(clippy::type_complexity)] // it's clear that (Vec<u8>, Vec<u8>) is a key-value pair
fn get_all(&self, table: &str) -> Result<Vec<(Vec<u8>, Vec<u8>)>, EngineError>;

/// Commit a batch of write operations
/// If sync is true, the write will be flushed from the operating system
/// buffer cache before the write is considered complete. If this
/// flag is true, writes will be slower.
///
/// # Errors
/// Return `EngineError::TableNotFound` if the given table does not exist
/// Return `EngineError` if met some errors
fn write_batch(&self, wr_ops: Vec<WriteOperation<'_>>, sync: bool) -> Result<(), EngineError>;

/// Get a snapshot of the current state of the database
///
/// # Errors
Expand Down
3 changes: 2 additions & 1 deletion crates/engine/src/api/operation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::EngineError;

/// Storage operations
/// TODO: refactor this trait, require `&mut self` for write operations
pub trait StorageOps {
/// Write an op to the transaction
///
Expand Down Expand Up @@ -42,7 +43,7 @@ pub trait StorageOps {
/// Write operation
#[allow(clippy::module_name_repetitions)]
#[non_exhaustive]
#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum WriteOperation<'a> {
/// `Put` operation
Put {
Expand Down
152 changes: 85 additions & 67 deletions crates/engine/src/memory_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ use std::{

use bytes::{Bytes, BytesMut};
use clippy_utilities::NumericCast;
use parking_lot::RwLock;
use parking_lot::{RwLock, RwLockWriteGuard};
use tokio::io::AsyncWriteExt;
use tokio_util::io::read_buf;

pub(super) use self::transaction::MemoryTransaction;
use crate::{
api::{engine_api::StorageEngine, snapshot_api::SnapshotApi},
error::EngineError,
WriteOperation,
StorageOps, WriteOperation,
};

pub(super) use self::transaction::MemoryTransaction;

/// A helper type to store the key-value pairs for the `MemoryEngine`
type MemoryTable = HashMap<Vec<u8>, Vec<u8>>;

Expand Down Expand Up @@ -53,6 +54,44 @@ impl MemoryEngine {
inner: Arc::new(RwLock::new(db)),
}
}

/// Write an operation
#[inline]
fn write_op(
inner: &mut RwLockWriteGuard<'_, HashMap<String, MemoryTable>>,
op: WriteOperation<'_>,
) -> Result<(), EngineError> {
match op {
WriteOperation::Put { table, key, value } => {
let table = inner
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
let _ignore = table.insert(key, value);
}
WriteOperation::Delete { table, key } => {
let table = inner
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
let _ignore = table.remove(key);
}
WriteOperation::DeleteRange { table, from, to } => {
let table = inner
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
table.retain(|key, _value| {
let key_slice = key.as_slice();
match key_slice.cmp(from) {
Ordering::Less => true,
Ordering::Equal => false,
Ordering::Greater => {
matches!(key_slice.cmp(to), Ordering::Equal | Ordering::Greater)
}
}
});
}
}
Ok(())
}
}

#[async_trait::async_trait]
Expand All @@ -73,32 +112,6 @@ impl StorageEngine for MemoryEngine {
}
}

#[inline]
fn get(&self, table: &str, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>, EngineError> {
let inner = self.inner.read();
let table = inner
.get(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
Ok(table.get(&key.as_ref().to_vec()).cloned())
}

#[inline]
fn get_multi(
&self,
table: &str,
keys: &[impl AsRef<[u8]>],
) -> Result<Vec<Option<Vec<u8>>>, EngineError> {
let inner = self.inner.read();
let table = inner
.get(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;

Ok(keys
.iter()
.map(|key| table.get(&key.as_ref().to_vec()).cloned())
.collect())
}

#[inline]
fn get_all(&self, table: &str) -> Result<Vec<(Vec<u8>, Vec<u8>)>, EngineError> {
let inner = self.inner.read();
Expand All @@ -113,44 +126,6 @@ impl StorageEngine for MemoryEngine {
Ok(values)
}

#[inline]
fn write_batch(&self, wr_ops: Vec<WriteOperation<'_>>, _sync: bool) -> Result<(), EngineError> {
let mut inner = self.inner.write();
for op in wr_ops {
match op {
WriteOperation::Put { table, key, value } => {
let table = inner
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
let _ignore = table.insert(key, value);
}
WriteOperation::Delete { table, key } => {
let table = inner
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
let _ignore = table.remove(key);
}
WriteOperation::DeleteRange { table, from, to } => {
let table = inner
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
table.retain(|key, _value| {
let key_slice = key.as_slice();
match key_slice.cmp(from) {
Ordering::Less => true,
Ordering::Equal => false,
Ordering::Greater => match key_slice.cmp(to) {
Ordering::Less => false,
Ordering::Equal | Ordering::Greater => true,
},
}
});
}
}
}
Ok(())
}

#[inline]
fn get_snapshot(
&self,
Expand Down Expand Up @@ -190,6 +165,49 @@ impl StorageEngine for MemoryEngine {
}
}

impl StorageOps for MemoryEngine {
fn write(&self, op: WriteOperation<'_>, _sync: bool) -> Result<(), EngineError> {
let mut inner = self.inner.write();
Self::write_op(&mut inner, op)
}

#[inline]
fn write_multi<'a, Ops>(&self, ops: Ops, _sync: bool) -> Result<(), EngineError>
where
Ops: IntoIterator<Item = WriteOperation<'a>>,
{
let mut inner = self.inner.write();
for op in ops {
Self::write_op(&mut inner, op)?;
}
Ok(())
}

fn get(&self, table: &str, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>, EngineError> {
let inner = self.inner.read();
let table = inner
.get(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
Ok(table.get(&key.as_ref().to_vec()).cloned())
}

fn get_multi(
&self,
table: &str,
keys: &[impl AsRef<[u8]>],
) -> Result<Vec<Option<Vec<u8>>>, EngineError> {
let inner = self.inner.read();
let table = inner
.get(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;

Ok(keys
.iter()
.map(|key| table.get(&key.as_ref().to_vec()).cloned())
.collect())
}
}

/// A snapshot of the `MemoryEngine`
#[derive(Debug, Default)]
pub struct MemorySnapshot {
Expand Down
Loading

0 comments on commit 232c9da

Please sign in to comment.