Skip to content

Commit

Permalink
sql: return mvcc::TransactionState in StatementResult::Begin
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jul 21, 2024
1 parent a1d54aa commit df8cabb
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 59 deletions.
10 changes: 5 additions & 5 deletions src/bin/toysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ Storage: {keys} keys, {logical_size} MB logical, {nodes}x {disk_size} MB disk,
/// Runs a query and displays the results
fn execute_query(&mut self, query: &str) -> Result<()> {
match self.client.execute(query)? {
StatementResult::Begin { version, read_only } => match read_only {
false => println!("Began transaction at new version {}", version),
true => println!("Began read-only transaction at version {}", version),
StatementResult::Begin { state } => match state.read_only {
false => println!("Began transaction at new version {}", state.version),
true => println!("Began read-only transaction at version {}", state.version),
},
StatementResult::Commit { version: id } => println!("Committed transaction {}", id),
StatementResult::Rollback { version: id } => println!("Rolled back transaction {}", id),
Expand Down Expand Up @@ -214,8 +214,8 @@ Storage: {keys} keys, {logical_size} MB logical, {nodes}x {disk_size} MB disk,
/// Prompts the user for input
fn prompt(&mut self) -> Result<Option<String>> {
let prompt = match self.client.txn() {
Some((version, false)) => format!("toydb:{}> ", version),
Some((version, true)) => format!("toydb@{}> ", version),
Some(txn) if txn.read_only => format!("toydb@{}> ", txn.version),
Some(txn) => format!("toydb:{}> ", txn.version),
None => "toydb> ".into(),
};
match self.editor.readline(&prompt) {
Expand Down
13 changes: 6 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use crate::error::{Error, Result};
use crate::server::{Request, Response, Status};
use crate::sql::engine::StatementResult;
use crate::sql::types::Table;
use crate::storage::mvcc;

use rand::Rng;

/// A toyDB client
pub struct Client {
reader: std::io::BufReader<std::net::TcpStream>,
writer: std::io::BufWriter<std::net::TcpStream>,
txn: Option<(u64, bool)>,
txn: Option<mvcc::TransactionState>,
}

impl Client {
Expand All @@ -39,9 +40,7 @@ impl Client {
response => return errdata!("unexpected response {response:?}"),
};
match &resultset {
StatementResult::Begin { version, read_only } => {
self.txn = Some((*version, *read_only))
}
StatementResult::Begin { state } => self.txn = Some(state.clone()),
StatementResult::Commit { .. } => self.txn = None,
StatementResult::Rollback { .. } => self.txn = None,
_ => {}
Expand Down Expand Up @@ -73,9 +72,9 @@ impl Client {
}
}

/// Returns the version and read-only state of the txn
pub fn txn(&self) -> Option<(u64, bool)> {
self.txn
/// Returns the transaction state.
pub fn txn(&self) -> Option<&mvcc::TransactionState> {
self.txn.as_ref()
}

/// Runs the given closure, automatically retrying serialization and abort
Expand Down
10 changes: 8 additions & 2 deletions src/sql/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,16 @@ pub trait Engine<'a>: Sized {
/// cost. With the Raft engine, each call results in a Raft roundtrip, and we'd
/// rather not have to do that for every single row that's modified.
pub trait Transaction {
/// The transaction's internal MVCC state.
fn state(&self) -> &mvcc::TransactionState;
/// The transaction's MVCC version. Unique for read/write transactions.
fn version(&self) -> mvcc::Version;
fn version(&self) -> mvcc::Version {
self.state().version
}
/// Whether the transaction is read-only.
fn read_only(&self) -> bool;
fn read_only(&self) -> bool {
self.state().read_only
}

/// Commits the transaction.
fn commit(self) -> Result<()>;
Expand Down
8 changes: 2 additions & 6 deletions src/sql/engine/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,8 @@ impl<E: storage::Engine> Transaction<E> {
}

impl<E: storage::Engine> super::Transaction for Transaction<E> {
fn version(&self) -> u64 {
self.txn.version()
}

fn read_only(&self) -> bool {
self.txn.read_only()
fn state(&self) -> &mvcc::TransactionState {
self.txn.state()
}

fn commit(self) -> Result<()> {
Expand Down
8 changes: 2 additions & 6 deletions src/sql/engine/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,8 @@ impl<'a> Transaction<'a> {
}

impl<'a> super::Transaction for Transaction<'a> {
fn version(&self) -> mvcc::Version {
self.state.version
}

fn read_only(&self) -> bool {
self.state.read_only
fn state(&self) -> &mvcc::TransactionState {
&self.state
}

fn commit(self) -> Result<()> {
Expand Down
36 changes: 15 additions & 21 deletions src/sql/engine/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,21 @@ impl<'a, E: Engine<'a>> Session<'a, E> {
// Parse and execute the statement. Transaction control is done here,
// other statements are executed by the SQL engine.
Ok(match Parser::new(statement).parse()? {
ast::Statement::Begin { .. } if self.txn.is_some() => {
return errinput!("already in a transaction")
}
ast::Statement::Begin { read_only: false, as_of: Some(_) } => {
return errinput!("can't start read-write transaction in a given version")
}
ast::Statement::Begin { read_only: false, as_of: None } => {
let txn = self.engine.begin()?;
let version = txn.version();
self.txn = Some(txn);
StatementResult::Begin { version, read_only: false }
}
ast::Statement::Begin { read_only: true, as_of: None } => {
let txn = self.engine.begin_read_only()?;
let version = txn.version();
ast::Statement::Begin { read_only, as_of } => {
if self.txn.is_some() {
return errinput!("already in a transaction");
}
let txn = match (read_only, as_of) {
(false, None) => self.engine.begin()?,
(true, None) => self.engine.begin_read_only()?,
(true, Some(as_of)) => self.engine.begin_as_of(as_of)?,
(false, Some(_)) => {
return errinput!("can't start read-write transaction in a given version")
}
};
let state = txn.state().clone();
self.txn = Some(txn);
StatementResult::Begin { version, read_only: true }
}
ast::Statement::Begin { read_only: true, as_of: Some(version) } => {
self.txn = Some(self.engine.begin_as_of(version)?);
StatementResult::Begin { version, read_only: true }
StatementResult::Begin { state }
}
ast::Statement::Commit => {
let Some(txn) = self.txn.take() else {
Expand Down Expand Up @@ -131,7 +125,7 @@ impl<'a, E: Engine<'a>> Drop for Session<'a, E> {
/// A session statement result. Sent across the wire to SQL clients.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum StatementResult {
Begin { version: mvcc::Version, read_only: bool },
Begin { state: mvcc::TransactionState },
Commit { version: mvcc::Version },
Rollback { version: mvcc::Version },
Explain(Plan),
Expand Down
74 changes: 62 additions & 12 deletions tests/e2e/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeSet;

use super::{assert_row, assert_rows, dataset, TestCluster};

use toydb::error::{Error, Result};
Expand Down Expand Up @@ -240,8 +242,16 @@ fn execute_txn() -> Result<()> {
assert_eq!(c.txn(), None);

// Committing a change in a txn should work
assert_eq!(c.execute("BEGIN")?, StatementResult::Begin { version: 2, read_only: false });
assert_eq!(c.txn(), Some((2, false)));
assert_eq!(
c.execute("BEGIN")?,
StatementResult::Begin {
state: mvcc::TransactionState { version: 2, read_only: false, active: BTreeSet::new() }
}
);
assert_eq!(
c.txn(),
Some(&mvcc::TransactionState { version: 2, read_only: false, active: BTreeSet::new() })
);
c.execute("INSERT INTO genres VALUES (4, 'Drama')")?;
assert_eq!(c.execute("COMMIT")?, StatementResult::Commit { version: 2 });
assert_eq!(c.txn(), None);
Expand All @@ -252,8 +262,16 @@ fn execute_txn() -> Result<()> {
assert_eq!(c.txn(), None);

// Rolling back a change in a txn should also work
assert_eq!(c.execute("BEGIN")?, StatementResult::Begin { version: 3, read_only: false });
assert_eq!(c.txn(), Some((3, false)));
assert_eq!(
c.execute("BEGIN")?,
StatementResult::Begin {
state: mvcc::TransactionState { version: 3, read_only: false, active: BTreeSet::new() }
}
);
assert_eq!(
c.txn(),
Some(&mvcc::TransactionState { version: 3, read_only: false, active: BTreeSet::new() })
);
c.execute("INSERT INTO genres VALUES (5, 'Musical')")?;
assert_row(
c.execute("SELECT * FROM genres WHERE id = 5")?,
Expand All @@ -266,9 +284,14 @@ fn execute_txn() -> Result<()> {
// Starting a read-only txn should block writes
assert_eq!(
c.execute("BEGIN READ ONLY")?,
StatementResult::Begin { version: 4, read_only: true }
StatementResult::Begin {
state: mvcc::TransactionState { version: 4, read_only: true, active: BTreeSet::new() }
}
);
assert_eq!(
c.txn(),
Some(&mvcc::TransactionState { version: 4, read_only: true, active: BTreeSet::new() })
);
assert_eq!(c.txn(), Some((4, true)));
assert_row(
c.execute("SELECT * FROM genres WHERE id = 4")?,
vec![Value::Integer(4), Value::String("Drama".into())],
Expand All @@ -284,9 +307,14 @@ fn execute_txn() -> Result<()> {
// block writes
assert_eq!(
c.execute("BEGIN READ ONLY AS OF SYSTEM TIME 2")?,
StatementResult::Begin { version: 2, read_only: true },
StatementResult::Begin {
state: mvcc::TransactionState { version: 2, read_only: true, active: BTreeSet::new() }
},
);
assert_eq!(
c.txn(),
Some(&mvcc::TransactionState { version: 2, read_only: true, active: BTreeSet::new() })
);
assert_eq!(c.txn(), Some((2, true)));
assert_rows(
c.execute("SELECT * FROM genres")?,
vec![
Expand All @@ -299,13 +327,21 @@ fn execute_txn() -> Result<()> {
assert_eq!(c.execute("COMMIT")?, StatementResult::Commit { version: 2 });

// A txn should still be usable after an error occurs
assert_eq!(c.execute("BEGIN")?, StatementResult::Begin { version: 4, read_only: false });
assert_eq!(
c.execute("BEGIN")?,
StatementResult::Begin {
state: mvcc::TransactionState { version: 4, read_only: false, active: BTreeSet::new() }
},
);
c.execute("INSERT INTO genres VALUES (5, 'Horror')")?;
assert_eq!(
c.execute("INSERT INTO genres VALUES (5, 'Musical')"),
Err(Error::InvalidInput("primary key 5 already exists".into()))
);
assert_eq!(c.txn(), Some((4, false)));
assert_eq!(
c.txn(),
Some(&mvcc::TransactionState { version: 4, read_only: false, active: BTreeSet::new() })
);
c.execute("INSERT INTO genres VALUES (6, 'Western')")?;
assert_eq!(c.execute("COMMIT")?, StatementResult::Commit { version: 4 });
assert_rows(
Expand All @@ -331,8 +367,22 @@ fn execute_txn_concurrent() -> Result<()> {
let mut b = tc.connect_any()?;

// Concurrent updates should throw a serialization failure on conflict.
assert_eq!(a.execute("BEGIN")?, StatementResult::Begin { version: 2, read_only: false });
assert_eq!(b.execute("BEGIN")?, StatementResult::Begin { version: 3, read_only: false });
assert_eq!(
a.execute("BEGIN")?,
StatementResult::Begin {
state: mvcc::TransactionState { version: 2, read_only: false, active: BTreeSet::new() }
},
);
assert_eq!(
b.execute("BEGIN")?,
StatementResult::Begin {
state: mvcc::TransactionState {
version: 3,
read_only: false,
active: BTreeSet::from([2])
}
},
);

assert_row(
a.execute("SELECT * FROM genres WHERE id = 1")?,
Expand Down

0 comments on commit df8cabb

Please sign in to comment.