diff --git a/src/bin/toysql.rs b/src/bin/toysql.rs index 1285401e5..e28cdd8bb 100644 --- a/src/bin/toysql.rs +++ b/src/bin/toysql.rs @@ -184,9 +184,9 @@ Storage: {keys} keys, {logical_size} MB logical, {nodes}x {disk_size} MB disk, /// Runs a query and displays the results fn execute_query(&mut self, query: &str) -> Result<()> { match self.client.execute(query)? { - StatementResult::Begin { version, read_only } => match read_only { - false => println!("Began transaction at new version {}", version), - true => println!("Began read-only transaction at version {}", version), + StatementResult::Begin { state } => match state.read_only { + false => println!("Began transaction at new version {}", state.version), + true => println!("Began read-only transaction at version {}", state.version), }, StatementResult::Commit { version: id } => println!("Committed transaction {}", id), StatementResult::Rollback { version: id } => println!("Rolled back transaction {}", id), @@ -214,8 +214,8 @@ Storage: {keys} keys, {logical_size} MB logical, {nodes}x {disk_size} MB disk, /// Prompts the user for input fn prompt(&mut self) -> Result> { let prompt = match self.client.txn() { - Some((version, false)) => format!("toydb:{}> ", version), - Some((version, true)) => format!("toydb@{}> ", version), + Some(txn) if txn.read_only => format!("toydb@{}> ", txn.version), + Some(txn) => format!("toydb:{}> ", txn.version), None => "toydb> ".into(), }; match self.editor.readline(&prompt) { diff --git a/src/client.rs b/src/client.rs index c724de082..5c3ac46d9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,6 +6,7 @@ use crate::error::{Error, Result}; use crate::server::{Request, Response, Status}; use crate::sql::engine::StatementResult; use crate::sql::types::Table; +use crate::storage::mvcc; use rand::Rng; @@ -13,7 +14,7 @@ use rand::Rng; pub struct Client { reader: std::io::BufReader, writer: std::io::BufWriter, - txn: Option<(u64, bool)>, + txn: Option, } impl Client { @@ -39,9 +40,7 @@ impl Client { response => return errdata!("unexpected response {response:?}"), }; match &resultset { - StatementResult::Begin { version, read_only } => { - self.txn = Some((*version, *read_only)) - } + StatementResult::Begin { state } => self.txn = Some(state.clone()), StatementResult::Commit { .. } => self.txn = None, StatementResult::Rollback { .. } => self.txn = None, _ => {} @@ -73,9 +72,9 @@ impl Client { } } - /// Returns the version and read-only state of the txn - pub fn txn(&self) -> Option<(u64, bool)> { - self.txn + /// Returns the transaction state. + pub fn txn(&self) -> Option<&mvcc::TransactionState> { + self.txn.as_ref() } /// Runs the given closure, automatically retrying serialization and abort diff --git a/src/sql/engine/engine.rs b/src/sql/engine/engine.rs index 449fd95bf..c4d884d5b 100644 --- a/src/sql/engine/engine.rs +++ b/src/sql/engine/engine.rs @@ -38,10 +38,16 @@ pub trait Engine<'a>: Sized { /// cost. With the Raft engine, each call results in a Raft roundtrip, and we'd /// rather not have to do that for every single row that's modified. pub trait Transaction { + /// The transaction's internal MVCC state. + fn state(&self) -> &mvcc::TransactionState; /// The transaction's MVCC version. Unique for read/write transactions. - fn version(&self) -> mvcc::Version; + fn version(&self) -> mvcc::Version { + self.state().version + } /// Whether the transaction is read-only. - fn read_only(&self) -> bool; + fn read_only(&self) -> bool { + self.state().read_only + } /// Commits the transaction. fn commit(self) -> Result<()>; diff --git a/src/sql/engine/local.rs b/src/sql/engine/local.rs index e025c1728..6553122fc 100644 --- a/src/sql/engine/local.rs +++ b/src/sql/engine/local.rs @@ -145,12 +145,8 @@ impl Transaction { } impl super::Transaction for Transaction { - fn version(&self) -> u64 { - self.txn.version() - } - - fn read_only(&self) -> bool { - self.txn.read_only() + fn state(&self) -> &mvcc::TransactionState { + self.txn.state() } fn commit(self) -> Result<()> { diff --git a/src/sql/engine/raft.rs b/src/sql/engine/raft.rs index 37fdf67ef..e19d76ce6 100644 --- a/src/sql/engine/raft.rs +++ b/src/sql/engine/raft.rs @@ -127,12 +127,8 @@ impl<'a> Transaction<'a> { } impl<'a> super::Transaction for Transaction<'a> { - fn version(&self) -> mvcc::Version { - self.state.version - } - - fn read_only(&self) -> bool { - self.state.read_only + fn state(&self) -> &mvcc::TransactionState { + &self.state } fn commit(self) -> Result<()> { diff --git a/src/sql/engine/session.rs b/src/sql/engine/session.rs index 5d6aaf1e9..2d14a689f 100644 --- a/src/sql/engine/session.rs +++ b/src/sql/engine/session.rs @@ -32,27 +32,21 @@ impl<'a, E: Engine<'a>> Session<'a, E> { // Parse and execute the statement. Transaction control is done here, // other statements are executed by the SQL engine. Ok(match Parser::new(statement).parse()? { - ast::Statement::Begin { .. } if self.txn.is_some() => { - return errinput!("already in a transaction") - } - ast::Statement::Begin { read_only: false, as_of: Some(_) } => { - return errinput!("can't start read-write transaction in a given version") - } - ast::Statement::Begin { read_only: false, as_of: None } => { - let txn = self.engine.begin()?; - let version = txn.version(); - self.txn = Some(txn); - StatementResult::Begin { version, read_only: false } - } - ast::Statement::Begin { read_only: true, as_of: None } => { - let txn = self.engine.begin_read_only()?; - let version = txn.version(); + ast::Statement::Begin { read_only, as_of } => { + if self.txn.is_some() { + return errinput!("already in a transaction"); + } + let txn = match (read_only, as_of) { + (false, None) => self.engine.begin()?, + (true, None) => self.engine.begin_read_only()?, + (true, Some(as_of)) => self.engine.begin_as_of(as_of)?, + (false, Some(_)) => { + return errinput!("can't start read-write transaction in a given version") + } + }; + let state = txn.state().clone(); self.txn = Some(txn); - StatementResult::Begin { version, read_only: true } - } - ast::Statement::Begin { read_only: true, as_of: Some(version) } => { - self.txn = Some(self.engine.begin_as_of(version)?); - StatementResult::Begin { version, read_only: true } + StatementResult::Begin { state } } ast::Statement::Commit => { let Some(txn) = self.txn.take() else { @@ -131,7 +125,7 @@ impl<'a, E: Engine<'a>> Drop for Session<'a, E> { /// A session statement result. Sent across the wire to SQL clients. #[derive(Debug, PartialEq, Serialize, Deserialize)] pub enum StatementResult { - Begin { version: mvcc::Version, read_only: bool }, + Begin { state: mvcc::TransactionState }, Commit { version: mvcc::Version }, Rollback { version: mvcc::Version }, Explain(Plan), diff --git a/tests/e2e/client.rs b/tests/e2e/client.rs index 28ba01e03..0b9499302 100644 --- a/tests/e2e/client.rs +++ b/tests/e2e/client.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeSet; + use super::{assert_row, assert_rows, dataset, TestCluster}; use toydb::error::{Error, Result}; @@ -240,8 +242,16 @@ fn execute_txn() -> Result<()> { assert_eq!(c.txn(), None); // Committing a change in a txn should work - assert_eq!(c.execute("BEGIN")?, StatementResult::Begin { version: 2, read_only: false }); - assert_eq!(c.txn(), Some((2, false))); + assert_eq!( + c.execute("BEGIN")?, + StatementResult::Begin { + state: mvcc::TransactionState { version: 2, read_only: false, active: BTreeSet::new() } + } + ); + assert_eq!( + c.txn(), + Some(&mvcc::TransactionState { version: 2, read_only: false, active: BTreeSet::new() }) + ); c.execute("INSERT INTO genres VALUES (4, 'Drama')")?; assert_eq!(c.execute("COMMIT")?, StatementResult::Commit { version: 2 }); assert_eq!(c.txn(), None); @@ -252,8 +262,16 @@ fn execute_txn() -> Result<()> { assert_eq!(c.txn(), None); // Rolling back a change in a txn should also work - assert_eq!(c.execute("BEGIN")?, StatementResult::Begin { version: 3, read_only: false }); - assert_eq!(c.txn(), Some((3, false))); + assert_eq!( + c.execute("BEGIN")?, + StatementResult::Begin { + state: mvcc::TransactionState { version: 3, read_only: false, active: BTreeSet::new() } + } + ); + assert_eq!( + c.txn(), + Some(&mvcc::TransactionState { version: 3, read_only: false, active: BTreeSet::new() }) + ); c.execute("INSERT INTO genres VALUES (5, 'Musical')")?; assert_row( c.execute("SELECT * FROM genres WHERE id = 5")?, @@ -266,9 +284,14 @@ fn execute_txn() -> Result<()> { // Starting a read-only txn should block writes assert_eq!( c.execute("BEGIN READ ONLY")?, - StatementResult::Begin { version: 4, read_only: true } + StatementResult::Begin { + state: mvcc::TransactionState { version: 4, read_only: true, active: BTreeSet::new() } + } + ); + assert_eq!( + c.txn(), + Some(&mvcc::TransactionState { version: 4, read_only: true, active: BTreeSet::new() }) ); - assert_eq!(c.txn(), Some((4, true))); assert_row( c.execute("SELECT * FROM genres WHERE id = 4")?, vec![Value::Integer(4), Value::String("Drama".into())], @@ -284,9 +307,14 @@ fn execute_txn() -> Result<()> { // block writes assert_eq!( c.execute("BEGIN READ ONLY AS OF SYSTEM TIME 2")?, - StatementResult::Begin { version: 2, read_only: true }, + StatementResult::Begin { + state: mvcc::TransactionState { version: 2, read_only: true, active: BTreeSet::new() } + }, + ); + assert_eq!( + c.txn(), + Some(&mvcc::TransactionState { version: 2, read_only: true, active: BTreeSet::new() }) ); - assert_eq!(c.txn(), Some((2, true))); assert_rows( c.execute("SELECT * FROM genres")?, vec![ @@ -299,13 +327,21 @@ fn execute_txn() -> Result<()> { assert_eq!(c.execute("COMMIT")?, StatementResult::Commit { version: 2 }); // A txn should still be usable after an error occurs - assert_eq!(c.execute("BEGIN")?, StatementResult::Begin { version: 4, read_only: false }); + assert_eq!( + c.execute("BEGIN")?, + StatementResult::Begin { + state: mvcc::TransactionState { version: 4, read_only: false, active: BTreeSet::new() } + }, + ); c.execute("INSERT INTO genres VALUES (5, 'Horror')")?; assert_eq!( c.execute("INSERT INTO genres VALUES (5, 'Musical')"), Err(Error::InvalidInput("primary key 5 already exists".into())) ); - assert_eq!(c.txn(), Some((4, false))); + assert_eq!( + c.txn(), + Some(&mvcc::TransactionState { version: 4, read_only: false, active: BTreeSet::new() }) + ); c.execute("INSERT INTO genres VALUES (6, 'Western')")?; assert_eq!(c.execute("COMMIT")?, StatementResult::Commit { version: 4 }); assert_rows( @@ -331,8 +367,22 @@ fn execute_txn_concurrent() -> Result<()> { let mut b = tc.connect_any()?; // Concurrent updates should throw a serialization failure on conflict. - assert_eq!(a.execute("BEGIN")?, StatementResult::Begin { version: 2, read_only: false }); - assert_eq!(b.execute("BEGIN")?, StatementResult::Begin { version: 3, read_only: false }); + assert_eq!( + a.execute("BEGIN")?, + StatementResult::Begin { + state: mvcc::TransactionState { version: 2, read_only: false, active: BTreeSet::new() } + }, + ); + assert_eq!( + b.execute("BEGIN")?, + StatementResult::Begin { + state: mvcc::TransactionState { + version: 3, + read_only: false, + active: BTreeSet::from([2]) + } + }, + ); assert_row( a.execute("SELECT * FROM genres WHERE id = 1")?,