diff --git a/src/db.rs b/src/db.rs index 0d9f95c3..3c0392de 100644 --- a/src/db.rs +++ b/src/db.rs @@ -797,7 +797,6 @@ impl Builder { } } -// This just makes it easier to throw `dbg` etc statements on `Result` impl std::fmt::Debug for Database { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("Database").finish() diff --git a/src/multimap_table.rs b/src/multimap_table.rs index ca108606..62e000d6 100644 --- a/src/multimap_table.rs +++ b/src/multimap_table.rs @@ -713,7 +713,6 @@ impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> DoubleEndedIterator /// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key pub struct MultimapTable<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> { name: String, - system: bool, transaction: &'txn WriteTransaction<'db>, freed_pages: Arc>>, tree: BtreeMut<'txn, K, &'static DynamicCollection>, @@ -724,7 +723,6 @@ pub struct MultimapTable<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTable<'db, 'txn, K, V> { pub(crate) fn new( name: &str, - system: bool, table_root: Option<(PageNumber, Checksum)>, freed_pages: Arc>>, mem: &'db TransactionalMemory, @@ -732,7 +730,6 @@ impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTable<'db, ' ) -> MultimapTable<'db, 'txn, K, V> { MultimapTable { name: name.to_string(), - system, transaction, freed_pages: freed_pages.clone(), tree: BtreeMut::new(table_root, mem, freed_pages), @@ -1129,8 +1126,7 @@ impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> Drop for MultimapTable<'db, 'txn, K, V> { fn drop(&mut self) { - self.transaction - .close_table(&self.name, self.system, &self.tree); + self.transaction.close_table(&self.name, &self.tree); } } diff --git a/src/table.rs b/src/table.rs index 6785a8fd..5e8d5b9a 100644 --- a/src/table.rs +++ b/src/table.rs @@ -57,7 +57,6 @@ impl TableStats { /// A table containing key-value mappings pub struct Table<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> { name: String, - system: bool, transaction: &'txn WriteTransaction<'db>, tree: BtreeMut<'txn, K, V>, } @@ -65,7 +64,6 @@ pub struct Table<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> { impl<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> Table<'db, 'txn, K, V> { pub(crate) fn new( name: &str, - system: bool, table_root: Option<(PageNumber, Checksum)>, freed_pages: Arc>>, mem: &'db TransactionalMemory, @@ -73,7 +71,6 @@ impl<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> Table<'db, 'txn, K ) -> Table<'db, 'txn, K, V> { Table { name: name.to_string(), - system, transaction, tree: BtreeMut::new(table_root, mem, freed_pages), } @@ -248,8 +245,7 @@ impl Sealed for Table<'_, '_, K, V> {} impl<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> Drop for Table<'db, 'txn, K, V> { fn drop(&mut self) { - self.transaction - .close_table(&self.name, self.system, &self.tree); + self.transaction.close_table(&self.name, &self.tree); } } @@ -470,7 +466,7 @@ pub struct Range<'a, K: RedbKey + 'static, V: RedbValue + 'static> { } impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Range<'a, K, V> { - fn new(inner: BtreeRangeIter<'a, K, V>) -> Self { + pub(super) fn new(inner: BtreeRangeIter<'a, K, V>) -> Self { Self { inner } } } diff --git a/src/transactions.rs b/src/transactions.rs index dc9cf4fa..6659671b 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -2,24 +2,25 @@ use crate::error::CommitError; use crate::sealed::Sealed; use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker}; use crate::tree_store::{ - Btree, BtreeMut, FreedPageList, FreedTableKey, InternalTableDefinition, PageHint, PageNumber, - SerializedSavepoint, TableTree, TableType, TransactionalMemory, + Btree, BtreeMut, Checksum, FreedPageList, FreedTableKey, InternalTableDefinition, PageHint, + PageNumber, SerializedSavepoint, TableTree, TableType, TransactionalMemory, MAX_VALUE_LENGTH, }; use crate::types::{RedbKey, RedbValue}; use crate::{ - Database, MultimapTable, MultimapTableDefinition, MultimapTableHandle, ReadOnlyMultimapTable, - ReadOnlyTable, ReadableTable, Result, Savepoint, SavepointError, Table, TableDefinition, - TableError, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle, + AccessGuard, Database, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range, + ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table, + TableDefinition, TableError, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle, }; #[cfg(feature = "logging")] use log::{info, warn}; +use std::borrow::Borrow; use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::marker::PhantomData; -use std::ops::RangeFull; +use std::ops::{RangeBounds, RangeFull}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::{panic, thread}; const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> = @@ -185,6 +186,191 @@ pub enum Durability { Paranoid, } +// Like a Table but only one may be open at a time to avoid possible races +pub struct SystemTable<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> { + name: String, + namespace: &'s mut SystemNamespace<'db>, + tree: BtreeMut<'s, K, V>, +} + +impl<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> SystemTable<'db, 's, K, V> { + fn new( + name: &str, + table_root: Option<(PageNumber, Checksum)>, + freed_pages: Arc>>, + mem: &'db TransactionalMemory, + namespace: &'s mut SystemNamespace<'db>, + ) -> SystemTable<'db, 's, K, V> { + SystemTable { + name: name.to_string(), + namespace, + tree: BtreeMut::new(table_root, mem, freed_pages), + } + } + + fn get<'a>(&self, key: impl Borrow>) -> Result>> + where + K: 'a, + { + self.tree.get(key.borrow()) + } + + fn range<'a, KR>(&self, range: impl RangeBounds + 'a) -> Result> + where + K: 'a, + KR: Borrow> + 'a, + { + self.tree.range(&range).map(Range::new) + } + + pub fn insert<'k, 'v>( + &mut self, + key: impl Borrow>, + value: impl Borrow>, + ) -> Result>> { + let value_len = V::as_bytes(value.borrow()).as_ref().len(); + if value_len > MAX_VALUE_LENGTH { + return Err(StorageError::ValueTooLarge(value_len)); + } + let key_len = K::as_bytes(key.borrow()).as_ref().len(); + if key_len > MAX_VALUE_LENGTH { + return Err(StorageError::ValueTooLarge(key_len)); + } + self.tree.insert(key.borrow(), value.borrow()) + } + + pub fn remove<'a>( + &mut self, + key: impl Borrow>, + ) -> Result>> + where + K: 'a, + { + self.tree.remove(key.borrow()) + } +} + +impl<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> Drop for SystemTable<'db, 's, K, V> { + fn drop(&mut self) { + self.namespace.close_table(&self.name, &self.tree); + } +} + +struct SystemNamespace<'db> { + table_tree: TableTree<'db>, +} + +impl<'db> SystemNamespace<'db> { + fn open_system_table<'txn, 's, K: RedbKey + 'static, V: RedbValue + 'static>( + &'s mut self, + transaction: &'txn WriteTransaction<'db>, + definition: SystemTableDefinition, + ) -> Result> { + #[cfg(feature = "logging")] + info!("Opening system table: {}", definition); + let root = self + .table_tree + .get_or_create_table::(definition.name(), TableType::Normal) + .map_err(|e| { + e.into_storage_error_or_corrupted("Internal error. System table is corrupted") + })?; + transaction.dirty.store(true, Ordering::Release); + + Ok(SystemTable::new( + definition.name(), + root.get_root(), + transaction.freed_pages.clone(), + transaction.mem, + self, + )) + } + + fn close_table( + &mut self, + name: &str, + table: &BtreeMut, + ) { + self.table_tree + .stage_update_table_root(name, table.get_root()); + } +} + +struct TableNamespace<'db> { + open_tables: HashMap>, + table_tree: TableTree<'db>, +} + +impl<'db> TableNamespace<'db> { + #[track_caller] + fn inner_open( + &mut self, + name: &str, + table_type: TableType, + ) -> Result, TableError> { + if let Some(location) = self.open_tables.get(name) { + return Err(TableError::TableAlreadyOpen(name.to_string(), location)); + } + + let internal_table = self + .table_tree + .get_or_create_table::(name, table_type)?; + self.open_tables + .insert(name.to_string(), panic::Location::caller()); + + Ok(internal_table.get_root()) + } + + #[track_caller] + pub fn open_multimap_table<'txn, K: RedbKey + 'static, V: RedbKey + 'static>( + &mut self, + transaction: &'txn WriteTransaction<'db>, + definition: MultimapTableDefinition, + ) -> Result, TableError> { + #[cfg(feature = "logging")] + info!("Opening multimap table: {}", definition); + let root = self.inner_open::(definition.name(), TableType::Multimap)?; + transaction.dirty.store(true, Ordering::Release); + + Ok(MultimapTable::new( + definition.name(), + root, + transaction.freed_pages.clone(), + transaction.mem, + transaction, + )) + } + + #[track_caller] + pub fn open_table<'txn, K: RedbKey + 'static, V: RedbValue + 'static>( + &mut self, + transaction: &'txn WriteTransaction<'db>, + definition: TableDefinition, + ) -> Result, TableError> { + #[cfg(feature = "logging")] + info!("Opening table: {}", definition); + let root = self.inner_open::(definition.name(), TableType::Normal)?; + transaction.dirty.store(true, Ordering::Release); + + Ok(Table::new( + definition.name(), + root, + transaction.freed_pages.clone(), + transaction.mem, + transaction, + )) + } + + pub(crate) fn close_table( + &mut self, + name: &str, + table: &BtreeMut, + ) { + self.open_tables.remove(name).unwrap(); + self.table_tree + .stage_update_table_root(name, table.get_root()); + } +} + /// A read/write transaction /// /// Only a single [`WriteTransaction`] may exist at a time @@ -193,8 +379,6 @@ pub struct WriteTransaction<'db> { transaction_tracker: Arc>, mem: &'db TransactionalMemory, transaction_id: TransactionId, - table_tree: RwLock>, - system_table_tree: RwLock>, // The table of freed pages by transaction. FreedTableKey -> binary. // The binary blob is a length-prefixed array of PageNumber freed_tree: Mutex>>, @@ -202,8 +386,8 @@ pub struct WriteTransaction<'db> { // Pages that were freed from the freed-tree. These can be freed immediately after commit(), // since read transactions do not access the freed-tree post_commit_frees: Arc>>, - open_tables: Mutex>>, - open_system_tables: Mutex>>, + tables: Mutex>, + system_tables: Mutex>, completed: bool, dirty: AtomicBool, durability: Durability, @@ -230,21 +414,22 @@ impl<'db> WriteTransaction<'db> { let freed_root = db.get_memory().get_freed_root(); let freed_pages = Arc::new(Mutex::new(vec![])); let post_commit_frees = Arc::new(Mutex::new(vec![])); + + let tables = TableNamespace { + open_tables: Default::default(), + table_tree: TableTree::new(root_page, db.get_memory(), freed_pages.clone()), + }; + let system_tables = SystemNamespace { + table_tree: TableTree::new(system_page, db.get_memory(), freed_pages.clone()), + }; + Ok(Self { db, transaction_tracker, mem: db.get_memory(), transaction_id, - table_tree: RwLock::new(TableTree::new( - root_page, - db.get_memory(), - freed_pages.clone(), - )), - system_table_tree: RwLock::new(TableTree::new( - system_page, - db.get_memory(), - freed_pages.clone(), - )), + tables: Mutex::new(tables), + system_tables: Mutex::new(system_tables), freed_tree: Mutex::new(BtreeMut::new( freed_root, db.get_memory(), @@ -252,8 +437,6 @@ impl<'db> WriteTransaction<'db> { )), freed_pages, post_commit_frees, - open_tables: Mutex::new(Default::default()), - open_system_tables: Mutex::new(Default::default()), completed: false, dirty: AtomicBool::new(false), durability: Durability::Immediate, @@ -263,50 +446,6 @@ impl<'db> WriteTransaction<'db> { }) } - #[track_caller] - fn open_system_table<'txn, K: RedbKey + 'static, V: RedbValue + 'static>( - &'txn self, - definition: SystemTableDefinition, - ) -> Result> { - #[cfg(feature = "logging")] - info!("Opening system table: {}", definition); - if let Some(location) = self - .open_system_tables - .lock() - .unwrap() - .get(definition.name()) - { - panic!( - "System table {} is already open at {}", - definition.name(), - location - ); - } - self.dirty.store(true, Ordering::Release); - - let internal_table = self - .system_table_tree - .write() - .unwrap() - .get_or_create_table::(definition.name(), TableType::Normal) - .map_err(|e| { - e.into_storage_error_or_corrupted("Internal error. System table is corrupted") - })?; - self.open_system_tables - .lock() - .unwrap() - .insert(definition.name().to_string(), panic::Location::caller()); - - Ok(Table::new( - definition.name(), - true, - internal_table.get_root(), - self.freed_pages.clone(), - self.mem, - self, - )) - } - /// Creates a snapshot of the current database state, which can be used to rollback the database. /// This savepoint will exist until it is deleted with `[delete_savepoint()]`. /// @@ -325,10 +464,13 @@ impl<'db> WriteTransaction<'db> { let mut savepoint = self.ephemeral_savepoint()?; - let mut next_table = self.open_system_table(NEXT_SAVEPOINT_TABLE)?; - let mut savepoint_table = self.open_system_table(SAVEPOINT_TABLE)?; + let mut system_tables = self.system_tables.lock().unwrap(); + + let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?; next_table.insert((), savepoint.get_id().next())?; + drop(next_table); + let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?; savepoint_table.insert( savepoint.get_id(), SerializedSavepoint::from_savepoint(&savepoint), @@ -345,7 +487,8 @@ impl<'db> WriteTransaction<'db> { } pub(crate) fn next_persistent_savepoint_id(&self) -> Result> { - let next_table = self.open_system_table(NEXT_SAVEPOINT_TABLE)?; + let mut system_tables = self.system_tables.lock().unwrap(); + let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?; let value = next_table.get(())?; if let Some(next_id) = value { Ok(Some(next_id.value())) @@ -356,7 +499,8 @@ impl<'db> WriteTransaction<'db> { /// Get a persistent savepoint given its id pub fn get_persistent_savepoint(&self, id: u64) -> Result { - let table = self.open_system_table(SAVEPOINT_TABLE)?; + let mut system_tables = self.system_tables.lock().unwrap(); + let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?; let value = table.get(SavepointId(id))?; value @@ -377,7 +521,8 @@ impl<'db> WriteTransaction<'db> { ) { return Err(SavepointError::InvalidSavepoint); } - let mut table = self.open_system_table(SAVEPOINT_TABLE)?; + let mut system_tables = self.system_tables.lock().unwrap(); + let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?; let savepoint = table.remove(SavepointId(id))?; if let Some(serialized) = savepoint { let savepoint = serialized @@ -395,7 +540,8 @@ impl<'db> WriteTransaction<'db> { /// List all persistent savepoints pub fn list_persistent_savepoints(&self) -> Result> { - let table = self.open_system_table(SAVEPOINT_TABLE)?; + let mut system_tables = self.system_tables.lock().unwrap(); + let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?; let mut savepoints = vec![]; for savepoint in table.range::(..)? { savepoints.push(savepoint?.0.value().0); @@ -473,9 +619,10 @@ impl<'db> WriteTransaction<'db> { // We don't want to rollback the system tree, so keep any pages it references let referenced_by_system_tree = self - .system_table_tree - .read() + .system_tables + .lock() .unwrap() + .table_tree .all_referenced_pages()?; let mut freed_pages = vec![]; @@ -490,11 +637,11 @@ impl<'db> WriteTransaction<'db> { } } *self.freed_pages.lock().unwrap() = freed_pages; - self.table_tree = RwLock::new(TableTree::new( + self.tables.lock().unwrap().table_tree = TableTree::new( savepoint.get_user_root(), self.mem, self.freed_pages.clone(), - )); + ); // Remove any freed pages that have already been processed. Otherwise this would result in a double free // We assume below that PageNumber is length 8 @@ -572,34 +719,7 @@ impl<'db> WriteTransaction<'db> { &'txn self, definition: TableDefinition, ) -> Result, TableError> { - #[cfg(feature = "logging")] - info!("Opening table: {}", definition); - if let Some(location) = self.open_tables.lock().unwrap().get(definition.name()) { - return Err(TableError::TableAlreadyOpen( - definition.name().to_string(), - location, - )); - } - self.dirty.store(true, Ordering::Release); - - let internal_table = self - .table_tree - .write() - .unwrap() - .get_or_create_table::(definition.name(), TableType::Normal)?; - self.open_tables - .lock() - .unwrap() - .insert(definition.name().to_string(), panic::Location::caller()); - - Ok(Table::new( - definition.name(), - false, - internal_table.get_root(), - self.freed_pages.clone(), - self.mem, - self, - )) + self.tables.lock().unwrap().open_table(self, definition) } /// Open the given table @@ -610,59 +730,18 @@ impl<'db> WriteTransaction<'db> { &'txn self, definition: MultimapTableDefinition, ) -> Result, TableError> { - #[cfg(feature = "logging")] - info!("Opening multimap table: {}", definition); - if let Some(location) = self.open_tables.lock().unwrap().get(definition.name()) { - return Err(TableError::TableAlreadyOpen( - definition.name().to_string(), - location, - )); - } - self.dirty.store(true, Ordering::Release); - - let internal_table = self - .table_tree - .write() - .unwrap() - .get_or_create_table::(definition.name(), TableType::Multimap)?; - self.open_tables + self.tables .lock() .unwrap() - .insert(definition.name().to_string(), panic::Location::caller()); - - Ok(MultimapTable::new( - definition.name(), - false, - internal_table.get_root(), - self.freed_pages.clone(), - self.mem, - self, - )) + .open_multimap_table(self, definition) } pub(crate) fn close_table( &self, name: &str, - system: bool, table: &BtreeMut, ) { - if system { - self.open_system_tables - .lock() - .unwrap() - .remove(name) - .unwrap(); - self.system_table_tree - .write() - .unwrap() - .stage_update_table_root(name, table.get_root()); - } else { - self.open_tables.lock().unwrap().remove(name).unwrap(); - self.table_tree - .write() - .unwrap() - .stage_update_table_root(name, table.get_root()); - } + self.tables.lock().unwrap().close_table(name, table); } /// Delete the given table @@ -672,9 +751,10 @@ impl<'db> WriteTransaction<'db> { #[cfg(feature = "logging")] info!("Deleting table: {}", definition.name()); self.dirty.store(true, Ordering::Release); - self.table_tree - .write() + self.tables + .lock() .unwrap() + .table_tree .delete_table(definition.name(), TableType::Normal) } @@ -688,17 +768,19 @@ impl<'db> WriteTransaction<'db> { #[cfg(feature = "logging")] info!("Deleting multimap table: {}", definition.name()); self.dirty.store(true, Ordering::Release); - self.table_tree - .write() + self.tables + .lock() .unwrap() + .table_tree .delete_table(definition.name(), TableType::Multimap) } /// List all the tables pub fn list_tables(&self) -> Result + '_> { - self.table_tree - .read() + self.tables + .lock() .unwrap() + .table_tree .list_tables(TableType::Normal) .map(|x| x.into_iter().map(UntypedTableHandle::new)) } @@ -707,9 +789,10 @@ impl<'db> WriteTransaction<'db> { pub fn list_multimap_tables( &self, ) -> Result + '_> { - self.table_tree - .read() + self.tables + .lock() .unwrap() + .table_tree .list_tables(TableType::Multimap) .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new)) } @@ -778,7 +861,11 @@ impl<'db> WriteTransaction<'db> { }, } } - self.table_tree.write().unwrap().clear_table_root_updates(); + self.tables + .lock() + .unwrap() + .table_tree + .clear_table_root_updates(); self.mem.rollback_uncommitted_writes()?; #[cfg(feature = "logging")] info!("Finished abort of transaction id={:?}", self.transaction_id); @@ -794,15 +881,17 @@ impl<'db> WriteTransaction<'db> { .unwrap_or(self.transaction_id); let user_root = self - .table_tree - .write() + .tables + .lock() .unwrap() + .table_tree .flush_table_root_updates()?; let system_root = self - .system_table_tree - .write() + .system_tables + .lock() .unwrap() + .table_tree .flush_table_root_updates()?; self.process_freed_pages(oldest_live_read)?; @@ -850,15 +939,17 @@ impl<'db> WriteTransaction<'db> { // Commit without a durability guarantee pub(crate) fn non_durable_commit(&mut self) -> Result { let user_root = self - .table_tree - .write() + .tables + .lock() .unwrap() + .table_tree .flush_table_root_updates()?; let system_root = self - .system_table_tree - .write() + .system_tables + .lock() .unwrap() + .table_tree .flush_table_root_updates()?; // Store all freed pages for a future commit(), since we can't free pages during a @@ -891,7 +982,8 @@ impl<'db> WriteTransaction<'db> { } // Relocate the btree pages - let mut table_tree = self.table_tree.write().unwrap(); + let mut tables = self.tables.lock().unwrap(); + let table_tree = &mut tables.table_tree; if table_tree.compact_tables()? { progress = true; } @@ -973,7 +1065,8 @@ impl<'db> WriteTransaction<'db> { /// Retrieves information about storage usage in the database pub fn stats(&self) -> Result { - let table_tree = self.table_tree.read().unwrap(); + let tables = self.tables.lock().unwrap(); + let table_tree = &tables.table_tree; let data_tree_stats = table_tree.stats()?; let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?; let total_metadata_bytes = data_tree_stats.metadata_bytes() @@ -998,9 +1091,10 @@ impl<'db> WriteTransaction<'db> { pub(crate) fn print_debug(&self) -> Result { // Flush any pending updates to make sure we get the latest root if let Some(page) = self - .table_tree - .write() + .tables + .lock() .unwrap() + .table_tree .flush_table_root_updates() .unwrap() {