Skip to content

Commit

Permalink
Move storage::kv down into storage root module.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Sep 2, 2023
1 parent 9dadfd1 commit 46b000d
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 46 deletions.
9 changes: 5 additions & 4 deletions src/bin/toydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ async fn main() -> Result<()> {
};
let raft_state: Box<dyn raft::State> = match cfg.storage_sql.as_str() {
"bitcask" | "" => {
let db = storage::kv::BitCask::new_compact(path.join("state"), cfg.compact_threshold)?;
Box::new(sql::engine::Raft::new_state(storage::kv::MVCC::new(db))?)
let engine =
storage::engine::BitCask::new_compact(path.join("state"), cfg.compact_threshold)?;
Box::new(sql::engine::Raft::new_state(engine)?)
}
"memory" => {
let db = storage::kv::Memory::new();
Box::new(sql::engine::Raft::new_state(storage::kv::MVCC::new(db))?)
let engine = storage::engine::Memory::new();
Box::new(sql::engine::Raft::new_state(engine)?)
}
name => return Err(Error::Config(format!("Unknown SQL storage engine {}", name))),
};
Expand Down
34 changes: 17 additions & 17 deletions src/sql/engine/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@ use super::super::schema::{Catalog, Table, Tables};
use super::super::types::{Expression, Row, Value};
use super::Transaction as _;
use crate::error::{Error, Result};
use crate::storage::kv;
use crate::storage;

use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::clone::Clone;
use std::collections::HashSet;

/// A SQL engine based on an underlying MVCC key/value store.
pub struct KV<E: kv::Engine> {
pub struct KV<E: storage::Engine> {
/// The underlying key/value store.
pub(super) kv: kv::MVCC<E>,
pub(super) kv: storage::mvcc::MVCC<E>,
}

// FIXME Implement Clone manually due to https://github.com/rust-lang/rust/issues/26925
impl<E: kv::Engine> Clone for KV<E> {
impl<E: storage::Engine> Clone for KV<E> {
fn clone(&self) -> Self {
KV::new(self.kv.clone())
KV { kv: self.kv.clone() }
}
}

impl<E: kv::Engine> KV<E> {
impl<E: storage::Engine> KV<E> {
/// Creates a new key/value-based SQL engine
pub fn new(kv: kv::MVCC<E>) -> Self {
Self { kv }
pub fn new(engine: E) -> Self {
Self { kv: storage::mvcc::MVCC::new(engine) }
}

/// Fetches an unversioned metadata value
Expand All @@ -39,7 +39,7 @@ impl<E: kv::Engine> KV<E> {
}
}

impl<E: kv::Engine> super::Engine for KV<E> {
impl<E: storage::Engine> super::Engine for KV<E> {
type Transaction = Transaction<E>;

fn begin(&self, mode: super::Mode) -> Result<Self::Transaction> {
Expand All @@ -62,13 +62,13 @@ fn deserialize<'a, V: Deserialize<'a>>(bytes: &'a [u8]) -> Result<V> {
}

/// An SQL transaction based on an MVCC key/value transaction
pub struct Transaction<E: kv::Engine> {
txn: kv::mvcc::Transaction<E>,
pub struct Transaction<E: storage::Engine> {
txn: storage::mvcc::Transaction<E>,
}

impl<E: kv::Engine> Transaction<E> {
impl<E: storage::Engine> Transaction<E> {
/// Creates a new SQL transaction from an MVCC transaction
fn new(txn: kv::mvcc::Transaction<E>) -> Self {
fn new(txn: storage::mvcc::Transaction<E>) -> Self {
Self { txn }
}

Expand Down Expand Up @@ -99,7 +99,7 @@ impl<E: kv::Engine> Transaction<E> {
}
}

impl<E: kv::Engine> super::Transaction for Transaction<E> {
impl<E: storage::Engine> super::Transaction for Transaction<E> {
fn id(&self) -> u64 {
self.txn.id()
}
Expand Down Expand Up @@ -270,7 +270,7 @@ impl<E: kv::Engine> super::Transaction for Transaction<E> {
}
}

impl<E: kv::Engine> Catalog for Transaction<E> {
impl<E: storage::Engine> Catalog for Transaction<E> {
fn create_table(&mut self, table: Table) -> Result<()> {
if self.read_table(&table.name)?.is_some() {
return Err(Error::Value(format!("Table {} already exists", table.name)));
Expand Down Expand Up @@ -325,7 +325,7 @@ enum Key<'a> {
impl<'a> Key<'a> {
/// Encodes the key as a byte vector
fn encode(self) -> Vec<u8> {
use kv::encoding::*;
use storage::encoding::*;
match self {
Self::Table(None) => vec![0x01],
Self::Table(Some(name)) => [&[0x01][..], &encode_string(&name)].concat(),
Expand All @@ -348,7 +348,7 @@ impl<'a> Key<'a> {

/// Decodes a key from a byte vector
fn decode(mut bytes: &[u8]) -> Result<Self> {
use kv::encoding::*;
use storage::encoding::*;
let bytes = &mut bytes;
let key = match take_byte(bytes)? {
0x01 => Self::Table(Some(take_string(bytes)?.into())),
Expand Down
2 changes: 1 addition & 1 deletion src/sql/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl<E: Engine + 'static> Session<E> {
}

/// The transaction mode
pub type Mode = crate::storage::kv::mvcc::Mode;
pub type Mode = crate::storage::mvcc::Mode;

/// A row scan iterator
pub type Scan = Box<dyn DoubleEndedIterator<Item = Result<Row>> + Send>;
Expand Down
18 changes: 9 additions & 9 deletions src/sql/engine/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::super::types::{Expression, Row, Value};
use super::{Engine as _, IndexScan, Mode, Scan, Transaction as _};
use crate::error::{Error, Result};
use crate::raft;
use crate::storage::kv;
use crate::storage;

use serde::{Deserialize, Serialize};
use std::collections::HashSet;
Expand Down Expand Up @@ -58,7 +58,7 @@ enum Query {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Status {
pub raft: raft::Status,
pub mvcc: kv::mvcc::Status,
pub mvcc: storage::mvcc::Status,
}

/// An SQL engine that wraps a Raft cluster.
Expand All @@ -74,8 +74,8 @@ impl Raft {
}

/// Creates an underlying state machine for a Raft engine.
pub fn new_state<E: kv::Engine>(kv: kv::MVCC<E>) -> Result<State<E>> {
State::new(kv)
pub fn new_state<E: storage::Engine>(engine: E) -> Result<State<E>> {
State::new(engine)
}

/// Returns Raft SQL engine status.
Expand Down Expand Up @@ -260,16 +260,16 @@ impl Catalog for Transaction {
}

/// The Raft state machine for the Raft-based SQL engine, using a KV SQL engine
pub struct State<E: kv::Engine> {
pub struct State<E: storage::Engine> {
/// The underlying KV SQL engine
engine: super::KV<E>,
/// The last applied index
applied_index: u64,
}

impl<E: kv::Engine> State<E> {
/// Creates a new Raft state maching using the given MVCC key/value store
pub fn new(engine: kv::MVCC<E>) -> Result<Self> {
impl<E: storage::Engine> State<E> {
/// Creates a new Raft state maching using the given storage engine.
pub fn new(engine: E) -> Result<Self> {
let engine = super::KV::new(engine);
let applied_index = engine
.get_metadata(b"applied_index")?
Expand Down Expand Up @@ -305,7 +305,7 @@ impl<E: kv::Engine> State<E> {
}
}

impl<E: kv::Engine> raft::State for State<E> {
impl<E: storage::Engine> raft::State for State<E> {
fn applied_index(&self) -> u64 {
self.applied_index
}
Expand Down
File renamed without changes.
7 changes: 0 additions & 7 deletions src/storage/kv/mod.rs

This file was deleted.

5 changes: 4 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod encoding;
pub mod engine;
pub mod kv;
pub mod log;
pub mod mvcc;

pub use engine::Engine;
2 changes: 1 addition & 1 deletion src/storage/kv/mvcc.rs → src/storage/mvcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ impl<'a> DoubleEndedIterator for Scan<'a> {

#[cfg(test)]
pub mod tests {
use super::super::Memory;
use super::super::engine::Memory;
use super::*;

fn setup() -> MVCC<Memory> {
Expand Down
4 changes: 2 additions & 2 deletions tests/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use toydb::sql::engine::{Mode, Status};
use toydb::sql::execution::ResultSet;
use toydb::sql::schema;
use toydb::sql::types::{Column, DataType, Value};
use toydb::storage::kv;
use toydb::storage::mvcc;
use toydb::Client;

use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -131,7 +131,7 @@ async fn status() -> Result<()> {
storage: "hybrid".into(),
storage_size: 3239,
},
mvcc: kv::mvcc::Status { txns: 1, txns_active: 0, storage: "memory".into() },
mvcc: mvcc::Status { txns: 1, txns_active: 0, storage: "memory".into() },
}
);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub async fn server(
id,
peers,
Box::new(storage::log::Hybrid::new(dir.path(), false)?),
Box::new(sql::engine::Raft::new_state(storage::kv::MVCC::new(storage::kv::Memory::new()))?),
Box::new(sql::engine::Raft::new_state(storage::engine::Memory::new())?),
)
.await?;

Expand Down
6 changes: 3 additions & 3 deletions tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ mod schema;

use toydb::error::Result;
use toydb::sql::engine::{Engine, KV};
use toydb::storage::kv;
use toydb::storage;

/// Sets up a basic in-memory SQL engine with an initial dataset.
fn setup(queries: Vec<&str>) -> Result<KV<kv::Memory>> {
let engine = KV::new(kv::MVCC::new(kv::Memory::new()));
fn setup(queries: Vec<&str>) -> Result<KV<storage::engine::Memory>> {
let engine = KV::new(storage::engine::Memory::new());
let mut session = engine.session()?;
session.execute("BEGIN")?;
for query in queries {
Expand Down

0 comments on commit 46b000d

Please sign in to comment.