diff --git a/docs/design.md b/docs/design.md index b584d7ee..2e1b4b16 100644 --- a/docs/design.md +++ b/docs/design.md @@ -31,8 +31,8 @@ database file. | magic number | | magic con.| god byte | padding | page size | | region header pages | region max data pages | -| region tracker page number | | number of full regions | data pages in trailing region | +| region tracker page number | | padding | | padding | | padding | @@ -88,12 +88,16 @@ controls which transaction pointer is the primary. `magic number` must be set to the ASCII letters 'redb' followed by 0x1A, 0x0A, 0xA9, 0x0D, 0x0A. This sequence is inspired by the PNG magic number. -`god byte`, so named because this byte controls the state of the entire database, is a bitfield containing two flags: +`god byte`, so named because this byte controls the state of the entire database, is a bitfield containing three flags: * first bit: `primary_bit` flag which indicates whether transaction slot 0 or transaction slot 1 contains the latest commit. - redb relies on the fact that this is a single bit to perform atomic commits. -* second bit: `recovery_required` flag, if set then the recovery process must be run when opening the database. - During the recovery process, the region tracker and regional allocator states -- described below -- are reconstructed - by walking the btree from all active roots. +* second bit: `recovery_required` flag, if set then the recovery process must be run when opening the database. This can be + a full repair, in which the region tracker and regional allocator states -- described below -- are reconstructed by walking + the btree from all active roots, or a quick-repair, in which the state is simply loaded from the allocator state table. +* third bit: `two_phase_commit` flag, which indicates whether the transaction in the primary slot was written using 2-phase + commit. If so, the primary slot is guaranteed to be valid, and repair won't look at the secondary slot. This flag is always + updated atomically along with the primary bit. + +redb relies on the fact that this is a single byte to perform atomic commits. `page size` is the size of a redb page in bytes @@ -155,7 +159,9 @@ changed during an upgrade. ### Region tracker The region tracker is an array of `BtreeBitmap`s that tracks the page orders which are free in each region. -It is stored in a page in the data section of a region: +There are two different places it can be stored: on shutdown, it's written to a page in the data section of +a region, and when making a commit with quick-repair enabled, it's written to an entry in the allocator state +table. The former is valid only after a clean shutdown; the latter is usable even after a crash. ``` <-------------------------------------------- 8 bytes -------------------------------------------> ================================================================================================== @@ -216,6 +222,11 @@ range has been allocated * n bytes: free index data * n bytes: allocated data +Like the region tracker, there are two different places where the regional allocator state can be +stored. On shutdown, it's written to the region header as described above, and when making a commit +with quick-repair enabled, it's written to an entry in the allocator state table. The former is valid +only after a clean shutdown; the latter is usable even after a crash. + ``` <-------------------------------------------- 8 bytes -------------------------------------------> ================================================================================================== @@ -461,6 +472,12 @@ To repair the database after an unclean shutdown we must: 2) Update the allocator state, so that it is consistent with all the database roots in the above transaction +If the last commit before the crash had quick-repair enabled, then these are both trivial. The +primary commit slot is guaranteed to be valid, because it was written using 2-phase commit, and +the corresponding allocator state is stored in the allocator state table. + +Otherwise, we need to perform a full repair: + For (1), if the primary commit slot is invalid we switch to the secondary slot. For (2), we rebuild the allocator state by walking the following trees and marking all referenced diff --git a/fuzz/fuzz_targets/common.rs b/fuzz/fuzz_targets/common.rs index faec1d70..ac0d15cb 100644 --- a/fuzz/fuzz_targets/common.rs +++ b/fuzz/fuzz_targets/common.rs @@ -164,6 +164,7 @@ pub(crate) enum FuzzOperation { pub(crate) struct FuzzTransaction { pub ops: Vec, pub durable: bool, + pub quick_repair: bool, pub commit: bool, pub create_ephemeral_savepoint: bool, pub create_persistent_savepoint: bool, diff --git a/fuzz/fuzz_targets/fuzz_redb.rs b/fuzz/fuzz_targets/fuzz_redb.rs index 599784d0..34c56cab 100644 --- a/fuzz/fuzz_targets/fuzz_redb.rs +++ b/fuzz/fuzz_targets/fuzz_redb.rs @@ -583,6 +583,7 @@ fn exec_table_crash_support(config: &FuzzConfig, apply: fn(WriteTransa if !transaction.durable { txn.set_durability(Durability::None); } + txn.set_quick_repair(transaction.quick_repair); let mut counter_table = txn.open_table(COUNTER_TABLE).unwrap(); let uncommitted_id = txn_id as u64 + 1; counter_table.insert((), uncommitted_id)?; diff --git a/src/db.rs b/src/db.rs index 1ff1ffde..c0558e5b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -5,9 +5,7 @@ use crate::tree_store::{ TableType, TransactionalMemory, PAGE_SIZE, }; use crate::types::{Key, Value}; -use crate::{ - CompactionError, DatabaseError, Durability, ReadOnlyTable, SavepointError, StorageError, -}; +use crate::{CompactionError, DatabaseError, ReadOnlyTable, SavepointError, StorageError}; use crate::{ReadTransaction, Result, WriteTransaction}; use std::fmt::{Debug, Display, Formatter}; @@ -21,7 +19,9 @@ use std::sync::{Arc, Mutex}; use crate::error::TransactionError; use crate::sealed::Sealed; -use crate::transactions::SAVEPOINT_TABLE; +use crate::transactions::{ + AllocatorStateKey, AllocatorStateTree, ALLOCATOR_STATE_TABLE_NAME, SAVEPOINT_TABLE, +}; use crate::tree_store::file_backend::FileBackend; #[cfg(feature = "logging")] use log::{debug, info, warn}; @@ -386,17 +386,34 @@ impl Database { .unwrap() .clear_cache_and_reload()?; - if !Self::verify_primary_checksums(self.mem.clone())? { - was_clean = false; - } + let old_roots = [ + self.mem.get_data_root(), + self.mem.get_system_root(), + self.mem.get_freed_root(), + ]; - Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err { + let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err { DatabaseError::Storage(storage_err) => storage_err, _ => unreachable!(), })?; - if allocator_hash != self.mem.allocator_hash() { + + if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() { was_clean = false; } + + if !was_clean { + let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next(); + let [data_root, system_root, freed_root] = new_roots; + self.mem.commit( + data_root, + system_root, + freed_root, + next_transaction_id, + false, + true, + )?; + } + self.mem.begin_writable()?; Ok(was_clean) @@ -414,7 +431,9 @@ impl Database { return Err(CompactionError::TransactionInProgress); } // Commit to free up any pending free pages - // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter + // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter. + // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user + // can cancel the compaction without requiring a full repair afterwards let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?; if txn.list_persistent_savepoints()?.next().is_some() { return Err(CompactionError::PersistentSavepointExists); @@ -422,11 +441,11 @@ impl Database { if self.transaction_tracker.any_savepoint_exists() { return Err(CompactionError::EphemeralSavepointExists); } - txn.set_durability(Durability::Paranoid); + txn.set_two_phase_commit(true); txn.commit().map_err(|e| e.into_storage_error())?; // Repeat, just in case executing list_persistent_savepoints() created a new table let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?; - txn.set_durability(Durability::Paranoid); + txn.set_two_phase_commit(true); txn.commit().map_err(|e| e.into_storage_error())?; // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages // should have been cleared out by the above commit() @@ -447,7 +466,7 @@ impl Database { // Double commit to free up the relocated pages for reuse let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?; - txn.set_durability(Durability::Paranoid); + txn.set_two_phase_commit(true); txn.commit().map_err(|e| e.into_storage_error())?; assert!(self.mem.get_freed_root().is_none()); @@ -592,8 +611,14 @@ impl Database { fn do_repair( mem: &mut Arc, // Only &mut to ensure exclusivity repair_callback: &(dyn Fn(&mut RepairSession) + 'static), - ) -> Result<(), DatabaseError> { + ) -> Result<[Option; 3], DatabaseError> { if !Self::verify_primary_checksums(mem.clone())? { + if mem.used_two_phase_commit() { + return Err(DatabaseError::Storage(StorageError::Corrupted( + "Primary is corrupted despite 2-phase commit".to_string(), + ))); + } + // 0.3 because the repair takes 3 full scans and the first is done now let mut handle = RepairSession::new(0.3); repair_callback(&mut handle); @@ -662,19 +687,7 @@ impl Database { // by storing an empty root during the below commit() mem.clear_read_cache(); - let transaction_id = mem.get_last_committed_transaction_id()?.next(); - mem.commit( - data_root, - system_root, - freed_root, - transaction_id, - false, - true, - // don't trim the database file, because we want the allocator hash to match exactly - false, - )?; - - Ok(()) + Ok([data_root, system_root, freed_root]) } fn new( @@ -698,14 +711,31 @@ impl Database { )?; let mut mem = Arc::new(mem); if mem.needs_repair()? { - #[cfg(feature = "logging")] - warn!("Database {:?} not shutdown cleanly. Repairing", &file_path); - let mut handle = RepairSession::new(0.0); - repair_callback(&mut handle); - if handle.aborted() { - return Err(DatabaseError::RepairAborted); + // If the last transaction used 2-phase commit and updated the allocator state table, then + // we can just load the allocator state from there. Otherwise, we need a full repair + if Self::try_quick_repair(mem.clone())? { + #[cfg(feature = "logging")] + info!("Quick-repair successful, full repair not needed"); + } else { + #[cfg(feature = "logging")] + warn!("Database {:?} not shutdown cleanly. Repairing", &file_path); + let mut handle = RepairSession::new(0.0); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } + let [data_root, system_root, freed_root] = + Self::do_repair(&mut mem, repair_callback)?; + let next_transaction_id = mem.get_last_committed_transaction_id()?.next(); + mem.commit( + data_root, + system_root, + freed_root, + next_transaction_id, + false, + true, + )?; } - Self::do_repair(&mut mem, repair_callback)?; } mem.begin_writable()?; @@ -740,6 +770,42 @@ impl Database { Ok(db) } + // Returns true if quick-repair was successful, or false if a full repair is needed + fn try_quick_repair(mem: Arc) -> Result { + // Quick-repair is only possible if the primary was written using 2-phase commit + if !mem.used_two_phase_commit() { + return Ok(false); + } + + // See if the allocator state table is present in the system table tree + let fake_freed_pages = Arc::new(Mutex::new(vec![])); + let system_table_tree = TableTreeMut::new( + mem.get_system_root(), + Arc::new(TransactionGuard::fake()), + mem.clone(), + fake_freed_pages.clone(), + ); + let Some(allocator_state_table) = system_table_tree + .get_table::(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal) + .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))? + else { + return Ok(false); + }; + + // Load the allocator state from the table + let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else { + unreachable!(); + }; + let tree = AllocatorStateTree::new( + table_root, + Arc::new(TransactionGuard::fake()), + mem.clone(), + fake_freed_pages, + ); + + mem.try_load_allocator_state(&tree) + } + fn allocate_read_transaction(&self) -> Result { let id = self .transaction_tracker @@ -1162,7 +1228,7 @@ mod test { let table_def: TableDefinition = TableDefinition::new("x"); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint0 = tx.ephemeral_savepoint().unwrap(); { tx.open_table(table_def).unwrap(); @@ -1170,7 +1236,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint1 = tx.ephemeral_savepoint().unwrap(); tx.restore_savepoint(&savepoint0).unwrap(); tx.set_durability(Durability::None); @@ -1182,7 +1248,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); tx.restore_savepoint(&savepoint0).unwrap(); { tx.open_table(table_def).unwrap(); @@ -1190,7 +1256,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint2 = tx.ephemeral_savepoint().unwrap(); drop(savepoint0); tx.restore_savepoint(&savepoint2).unwrap(); @@ -1203,7 +1269,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint3 = tx.ephemeral_savepoint().unwrap(); drop(savepoint1); tx.restore_savepoint(&savepoint3).unwrap(); @@ -1213,7 +1279,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint4 = tx.ephemeral_savepoint().unwrap(); drop(savepoint2); tx.restore_savepoint(&savepoint3).unwrap(); @@ -1225,7 +1291,7 @@ mod test { tx.abort().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint5 = tx.ephemeral_savepoint().unwrap(); drop(savepoint3); assert!(tx.restore_savepoint(&savepoint4).is_err()); @@ -1235,7 +1301,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); tx.restore_savepoint(&savepoint5).unwrap(); tx.set_durability(Durability::None); { diff --git a/src/multimap_table.rs b/src/multimap_table.rs index 5c24c3a3..ac42a7cc 100644 --- a/src/multimap_table.rs +++ b/src/multimap_table.rs @@ -308,8 +308,8 @@ pub(crate) fn finalize_tree_and_subtree_checksums( value_size, <()>::fixed_width(), ); - subtree.finalize_dirty_checksums()?; - sub_root_updates.push((i, entry.key().to_vec(), subtree.get_root().unwrap())); + let subtree_root = subtree.finalize_dirty_checksums()?.unwrap(); + sub_root_updates.push((i, entry.key().to_vec(), subtree_root)); } } } @@ -327,10 +327,10 @@ pub(crate) fn finalize_tree_and_subtree_checksums( Ok(()) })?; - tree.finalize_dirty_checksums()?; + let root = tree.finalize_dirty_checksums()?; // No pages should have been freed by this operation assert!(freed_pages.lock().unwrap().is_empty()); - Ok(tree.get_root()) + Ok(root) } fn parse_subtree_roots( diff --git a/src/transactions.rs b/src/transactions.rs index 732cbfc1..c2bbe47b 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -13,8 +13,8 @@ use crate::types::{Key, Value}; use crate::{ AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table, - TableDefinition, TableError, TableHandle, TransactionError, UntypedMultimapTableHandle, - UntypedTableHandle, + TableDefinition, TableError, TableHandle, TransactionError, TypeName, + UntypedMultimapTableHandle, UntypedTableHandle, }; #[cfg(feature = "logging")] use log::{debug, warn}; @@ -23,6 +23,7 @@ use std::cmp::min; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; +use std::mem::size_of; use std::ops::RangeBounds; #[cfg(any(test, fuzzing))] use std::ops::RangeFull; @@ -35,6 +36,70 @@ const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> = SystemTableDefinition::new("next_savepoint_id"); pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition = SystemTableDefinition::new("persistent_savepoints"); +// The allocator state table is stored in the system table tree, but it's accessed using +// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition +pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state"; +pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>; + +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] +pub(crate) enum AllocatorStateKey { + Region(u32), + RegionTracker, + TransactionId, +} + +impl Value for AllocatorStateKey { + type SelfType<'a> = Self; + type AsBytes<'a> = [u8; 1 + size_of::()]; + + fn fixed_width() -> Option { + Some(1 + size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + match data[0] { + 0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())), + 1 => Self::RegionTracker, + 2 => Self::TransactionId, + _ => unreachable!(), + } + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + let mut result = Self::AsBytes::default(); + match value { + Self::Region(region) => { + result[0] = 0; + result[1..].copy_from_slice(&u32::to_le_bytes(*region)); + } + Self::RegionTracker => { + result[0] = 1; + } + Self::TransactionId => { + result[0] = 2; + } + } + + result + } + + fn type_name() -> TypeName { + TypeName::internal("redb::AllocatorStateKey") + } +} + +impl Key for AllocatorStateKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + Self::from_bytes(data1).cmp(&Self::from_bytes(data2)) + } +} pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> { name: &'a str, @@ -151,48 +216,22 @@ pub enum Durability { Eventual, /// Commits with this durability level are guaranteed to be persistent as soon as /// [`WriteTransaction::commit`] returns. - /// - /// Data is written with checksums, with the following commit algorithm: - /// - /// 1. Update the inactive commit slot with the new database state - /// 2. Flip the god byte primary bit to activate the newly updated commit slot - /// 3. Call `fsync` to ensure all writes have been persisted to disk - /// - /// When opening the database after a crash, the most recent of the two commit slots with a - /// valid checksum is used. - /// - /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash - /// function with close to perfect collision resistance when used with non-malicious input. An - /// attacker with an extremely high degree of control over the database's workload, including - /// the ability to cause the database process to crash, can cause invalid data to be written - /// with a valid checksum, leaving the database in an invalid, attacker-controlled state. Immediate, - /// Commits with this durability level have the same gaurantees as [`Durability::Immediate`] - /// - /// Additionally, aata is written with the following 2-phase commit algorithm: - /// - /// 1. Update the inactive commit slot with the new database state - /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted - /// 3. Flip the god byte primary bit to activate the newly updated commit slot - /// 4. Call `fsync` to ensure the write to the god byte has been persisted - /// - /// This mitigates a theoretical attack where an attacker who - /// 1. can control the order in which pages are flushed to disk - /// 2. can introduce crashes during `fsync`, - /// 3. has knowledge of the database file contents, and - /// 4. can include arbitrary data in a write transaction - /// - /// could cause a transaction to partially commit (some but not all of the data is written). - /// This is described in the design doc in futher detail. - /// - /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data - /// has been persisted to disk after calling `fsync`. Even with this commit level, an attacker - /// with a high degree of control over the database's workload, including the ability to cause - /// the database process to crash, can cause the database to crash with the god byte primary bit - /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-controlled state. + /// This is identical to `Durability::Immediate`, but also enables 2-phase commit. New code + /// should call `set_two_phase_commit(true)` directly instead. + #[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")] Paranoid, } +// These are the actual durability levels used internally. `Durability::Paranoid` is translated +// to `InternalDurability::Immediate`, and also enables 2-phase commit +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +enum InternalDurability { + None, + Eventual, + Immediate, +} + // Like a Table but only one may be open at a time to avoid possible races pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> { name: String, @@ -447,7 +486,9 @@ pub struct WriteTransaction { system_tables: Mutex>, completed: bool, dirty: AtomicBool, - durability: Durability, + durability: InternalDurability, + two_phase_commit: bool, + quick_repair: bool, // Persistent savepoints created during this transaction created_persistent_savepoints: Mutex>, deleted_persistent_savepoints: Mutex>, @@ -504,7 +545,9 @@ impl WriteTransaction { post_commit_frees, completed: false, dirty: AtomicBool::new(false), - durability: Durability::Immediate, + durability: InternalDurability::Immediate, + two_phase_commit: false, + quick_repair: false, created_persistent_savepoints: Mutex::new(Default::default()), deleted_persistent_savepoints: Mutex::new(vec![]), }) @@ -595,10 +638,7 @@ impl WriteTransaction { /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened) /// or if the transaction's durability is less than `[Durability::Immediate]` pub fn persistent_savepoint(&self) -> Result { - if !matches!( - self.durability, - Durability::Immediate | Durability::Paranoid - ) { + if self.durability != InternalDurability::Immediate { return Err(SavepointError::InvalidSavepoint); } @@ -659,10 +699,7 @@ impl WriteTransaction { /// Returns `true` if the savepoint existed /// Returns `[SavepointError::InvalidSavepoint`] if the transaction's durability is less than `[Durability::Immediate]` pub fn delete_persistent_savepoint(&self, id: u64) -> Result { - if !matches!( - self.durability, - Durability::Immediate | Durability::Paranoid - ) { + if self.durability != InternalDurability::Immediate { return Err(SavepointError::InvalidSavepoint); } let mut system_tables = self.system_tables.lock().unwrap(); @@ -904,7 +941,74 @@ impl WriteTransaction { .unwrap() .is_empty(); assert!(no_created && no_deleted); - self.durability = durability; + + self.durability = match durability { + Durability::None => InternalDurability::None, + Durability::Eventual => InternalDurability::Eventual, + Durability::Immediate => InternalDurability::Immediate, + #[allow(deprecated)] + Durability::Paranoid => { + self.set_two_phase_commit(true); + InternalDurability::Immediate + } + }; + } + + /// Enable or disable 2-phase commit (defaults to disabled) + /// + /// By default, data is written using the following 1-phase commit algorithm: + /// + /// 1. Update the inactive commit slot with the new database state + /// 2. Flip the god byte primary bit to activate the newly updated commit slot + /// 3. Call `fsync` to ensure all writes have been persisted to disk + /// + /// All data is written with checksums. When opening the database after a crash, the most + /// recent of the two commit slots with a valid checksum is used. + /// + /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash + /// function with close to perfect collision resistance when used with non-malicious input. An + /// attacker with an extremely high degree of control over the database's workload, including + /// the ability to cause the database process to crash, can cause invalid data to be written + /// with a valid checksum, leaving the database in an invalid, attacker-controlled state. + /// + /// Alternatively, you can enable 2-phase commit, which writes data like this: + /// + /// 1. Update the inactive commit slot with the new database state + /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted + /// 3. Flip the god byte primary bit to activate the newly updated commit slot + /// 4. Call `fsync` to ensure the write to the god byte has been persisted + /// + /// This mitigates a theoretical attack where an attacker who + /// 1. can control the order in which pages are flushed to disk + /// 2. can introduce crashes during `fsync`, + /// 3. has knowledge of the database file contents, and + /// 4. can include arbitrary data in a write transaction + /// + /// could cause a transaction to partially commit (some but not all of the data is written). + /// This is described in the design doc in futher detail. + /// + /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data + /// has been persisted to disk after calling `fsync`. Even with 2-phase commit, an attacker with + /// a high degree of control over the database's workload, including the ability to cause the + /// database process to crash, can cause the database to crash with the god byte primary bit + /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker- + /// controlled state. + pub fn set_two_phase_commit(&mut self, enabled: bool) { + self.two_phase_commit = enabled; + } + + /// Enable or disable quick-repair (defaults to disabled) + /// + /// By default, when reopening the database after a crash, redb needs to do a full repair. + /// This involves walking the entire database to verify the checksums and reconstruct the + /// allocator state, so it can be very slow if the database is large. + /// + /// Alternatively, you can enable quick-repair. In this mode, redb saves the allocator state + /// as part of each commit (so it doesn't need to be reconstructed), and enables 2-phase commit + /// (which guarantees that the primary commit slot is valid without needing to look at the + /// checksums). This means commits are slower, but recovery after a crash is almost instant. + pub fn set_quick_repair(&mut self, enabled: bool) { + self.quick_repair = enabled; } /// Open the given table @@ -1000,16 +1104,20 @@ impl WriteTransaction { } fn commit_inner(&mut self) -> Result<(), CommitError> { + // Quick-repair requires 2-phase commit + if self.quick_repair { + self.two_phase_commit = true; + } + #[cfg(feature = "logging")] debug!( - "Committing transaction id={:?} with durability={:?}", - self.transaction_id, self.durability + "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}", + self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair ); match self.durability { - Durability::None => self.non_durable_commit()?, - Durability::Eventual => self.durable_commit(true, false)?, - Durability::Immediate => self.durable_commit(false, false)?, - Durability::Paranoid => self.durable_commit(false, true)?, + InternalDurability::None => self.non_durable_commit()?, + InternalDurability::Eventual => self.durable_commit(true)?, + InternalDurability::Immediate => self.durable_commit(false)?, } for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() { @@ -1062,39 +1170,72 @@ impl WriteTransaction { Ok(()) } - pub(crate) fn durable_commit(&mut self, eventual: bool, two_phase: bool) -> Result { + pub(crate) fn durable_commit(&mut self, eventual: bool) -> Result { let free_until_transaction = self .transaction_tracker .oldest_live_read_transaction() .map_or(self.transaction_id, |x| x.next()); + self.process_freed_pages(free_until_transaction)?; let user_root = self .tables .lock() .unwrap() .table_tree - .flush_table_root_updates()?; + .flush_table_root_updates()? + .finalize_dirty_checksums()?; - let system_root = self - .system_tables - .lock() - .unwrap() - .table_tree - .flush_table_root_updates()?; + let mut system_tables = self.system_tables.lock().unwrap(); + let system_tree = system_tables.table_tree.flush_table_root_updates()?; + system_tree + .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal) + .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?; + + if self.quick_repair { + system_tree.create_table_and_flush_table_root( + ALLOCATOR_STATE_TABLE_NAME, + |tree: &mut AllocatorStateTree| { + let mut pagination_counter = 0; + + loop { + let num_regions = self + .mem + .reserve_allocator_state(tree, self.transaction_id)?; + + // We can't free pages after the commit, because that would invalidate our + // saved allocator state. Everything needs to go through the transactional + // free mechanism + self.store_freed_pages(&mut pagination_counter, true)?; + + if self.mem.try_save_allocator_state(tree, num_regions)? { + return Ok(()); + } + + // Clear out the table before retrying, just in case the number of regions + // has somehow shrunk. Don't use retain_in() for this, since it doesn't + // free the pages immediately -- we need to reuse those pages to guarantee + // that our retry loop will eventually terminate + while let Some(guards) = tree.last()? { + let key = guards.0.value(); + drop(guards); + tree.remove(&key)?; + } + } + }, + )?; + } else { + // If a savepoint exists it might reference the freed-tree, since it holds a reference to the + // root of the freed-tree. Therefore, we must use the transactional free mechanism to free + // those pages. If there are no save points then these can be immediately freed, which is + // done at the end of this function. + let savepoint_exists = self.transaction_tracker.any_savepoint_exists(); + self.store_freed_pages(&mut 0, savepoint_exists)?; + } - self.process_freed_pages(free_until_transaction)?; - // If a savepoint exists it might reference the freed-tree, since it holds a reference to the - // root of the freed-tree. Therefore, we must use the transactional free mechanism to free - // those pages. If there are no save points then these can be immediately freed, which is - // done at the end of this function. - let savepoint_exists = self.transaction_tracker.any_savepoint_exists(); - self.store_freed_pages(savepoint_exists)?; + let system_root = system_tree.finalize_dirty_checksums()?; // Finalize freed table checksums, before doing the final commit - // user & system table trees were already finalized when we flushed the pending roots above - self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; - - let freed_root = self.freed_tree.lock().unwrap().get_root(); + let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; self.mem.commit( user_root, @@ -1102,8 +1243,7 @@ impl WriteTransaction { freed_root, self.transaction_id, eventual, - two_phase, - true, + self.two_phase_commit, )?; // Mark any pending non-durable commits as fully committed. @@ -1125,23 +1265,23 @@ impl WriteTransaction { .lock() .unwrap() .table_tree - .flush_table_root_updates()?; + .flush_table_root_updates()? + .finalize_dirty_checksums()?; let system_root = self .system_tables .lock() .unwrap() .table_tree - .flush_table_root_updates()?; + .flush_table_root_updates()? + .finalize_dirty_checksums()?; // Store all freed pages for a future commit(), since we can't free pages during a // non-durable commit (it's non-durable, so could be rolled back anytime in the future) - self.store_freed_pages(true)?; + self.store_freed_pages(&mut 0, true)?; // Finalize all checksums, before doing the final commit - self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; - - let freed_root = self.freed_tree.lock().unwrap().get_root(); + let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; self.mem .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?; @@ -1244,10 +1384,13 @@ impl WriteTransaction { Ok(()) } - fn store_freed_pages(&mut self, include_post_commit_free: bool) -> Result { + fn store_freed_pages( + &self, + pagination_counter: &mut u64, + include_post_commit_free: bool, + ) -> Result { assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8 - let mut pagination_counter = 0u64; let mut freed_tree = self.freed_tree.lock().unwrap(); if include_post_commit_free { // Move all the post-commit pages that came from the freed-tree. These need to be stored @@ -1262,7 +1405,7 @@ impl WriteTransaction { let buffer_size = FreedPageList::required_bytes(chunk_size); let key = FreedTableKey { transaction_id: self.transaction_id.raw_id(), - pagination_id: pagination_counter, + pagination_id: *pagination_counter, }; let mut access_guard = freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?; @@ -1275,7 +1418,7 @@ impl WriteTransaction { } drop(access_guard); - pagination_counter += 1; + *pagination_counter += 1; if include_post_commit_free { // Move all the post-commit pages that came from the freed-tree. These need to be stored @@ -1329,7 +1472,13 @@ impl WriteTransaction { pub(crate) fn print_debug(&self) -> Result { // Flush any pending updates to make sure we get the latest root let mut tables = self.tables.lock().unwrap(); - if let Some(page) = tables.table_tree.flush_table_root_updates().unwrap() { + if let Some(page) = tables + .table_tree + .flush_table_root_updates() + .unwrap() + .finalize_dirty_checksums() + .unwrap() + { eprintln!("Master tree:"); let master_tree: Btree<&str, InternalTableDefinition> = Btree::new( Some(page), diff --git a/src/tree_store/btree.rs b/src/tree_store/btree.rs index 991bfad6..541e9e3d 100644 --- a/src/tree_store/btree.rs +++ b/src/tree_store/btree.rs @@ -152,7 +152,7 @@ impl UntypedBtreeMut { } // Recomputes the checksum for all pages that are uncommitted - pub(crate) fn finalize_dirty_checksums(&mut self) -> Result { + pub(crate) fn finalize_dirty_checksums(&mut self) -> Result> { let mut root = self.root; if let Some(BtreeHeader { root: ref p, @@ -162,14 +162,14 @@ impl UntypedBtreeMut { { if !self.mem.uncommitted(*p) { // root page is clean - return Ok(()); + return Ok(root); } *checksum = self.finalize_dirty_checksums_helper(*p)?; self.root = root; } - Ok(()) + Ok(root) } fn finalize_dirty_checksums_helper(&mut self, page_number: PageNumber) -> Result { @@ -353,7 +353,7 @@ impl BtreeMut<'_, K, V> { .verify_checksum() } - pub(crate) fn finalize_dirty_checksums(&mut self) -> Result { + pub(crate) fn finalize_dirty_checksums(&mut self) -> Result> { let mut tree = UntypedBtreeMut::new( self.get_root(), self.mem.clone(), @@ -361,9 +361,8 @@ impl BtreeMut<'_, K, V> { K::fixed_width(), V::fixed_width(), ); - tree.finalize_dirty_checksums()?; - self.root = tree.get_root(); - Ok(()) + self.root = tree.finalize_dirty_checksums()?; + Ok(self.root) } #[allow(dead_code)] @@ -435,6 +434,23 @@ impl BtreeMut<'_, K, V> { Ok(old_value) } + // Insert without allocating or freeing any pages. This requires that you've previously + // inserted the same key, with a value of at least the same serialized length, earlier + // in the same transaction. If those preconditions aren't satisfied, insert_inplace() + // will panic; it won't allocate under any circumstances + pub(crate) fn insert_inplace( + &mut self, + key: &K::SelfType<'_>, + value: &V::SelfType<'_>, + ) -> Result<()> { + let mut fake_freed_pages = vec![]; + let mut operation = + MutateHelper::::new(&mut self.root, self.mem.clone(), fake_freed_pages.as_mut()); + operation.insert_inplace(key, value)?; + assert!(fake_freed_pages.is_empty()); + Ok(()) + } + pub(crate) fn remove(&mut self, key: &K::SelfType<'_>) -> Result>> { #[cfg(feature = "logging")] trace!("Btree(root={:?}): Deleting {:?}", &self.root, key); diff --git a/src/tree_store/btree_mutator.rs b/src/tree_store/btree_mutator.rs index 7c50335c..3d1790cd 100644 --- a/src/tree_store/btree_mutator.rs +++ b/src/tree_store/btree_mutator.rs @@ -5,7 +5,7 @@ use crate::tree_store::btree_base::{ use crate::tree_store::btree_mutator::DeletionResult::{ DeletedBranch, DeletedLeaf, PartialBranch, PartialLeaf, Subtree, }; -use crate::tree_store::page_store::{Page, PageImpl}; +use crate::tree_store::page_store::{Page, PageImpl, PageMut}; use crate::tree_store::{ AccessGuardMut, BtreeHeader, PageNumber, RawLeafBuilder, TransactionalMemory, }; @@ -496,6 +496,49 @@ impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> { }) } + pub(crate) fn insert_inplace( + &mut self, + key: &K::SelfType<'_>, + value: &V::SelfType<'_>, + ) -> Result<()> { + assert!(self.modify_uncommitted); + let header = self.root.expect("Key not found (tree is empty)"); + self.insert_inplace_helper( + self.mem.get_page_mut(header.root)?, + K::as_bytes(key).as_ref(), + V::as_bytes(value).as_ref(), + )?; + *self.root = Some(BtreeHeader::new(header.root, DEFERRED, header.length)); + Ok(()) + } + + fn insert_inplace_helper(&mut self, mut page: PageMut, key: &[u8], value: &[u8]) -> Result<()> { + assert!(self.mem.uncommitted(page.get_page_number())); + + let node_mem = page.memory(); + match node_mem[0] { + LEAF => { + let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width()); + let (position, found) = accessor.position::(key); + assert!(found); + let old_len = accessor.entry(position).unwrap().value().len(); + assert!(value.len() <= old_len); + let mut mutator = LeafMutator::new(&mut page, K::fixed_width(), V::fixed_width()); + mutator.insert(position, true, key, value); + } + BRANCH => { + let accessor = BranchAccessor::new(&page, K::fixed_width()); + let (child_index, child_page) = accessor.child_for_key::(key); + self.insert_inplace_helper(self.mem.get_page_mut(child_page)?, key, value)?; + let mut mutator = BranchMutator::new(&mut page); + mutator.write_child_page(child_index, child_page, DEFERRED); + } + _ => unreachable!(), + } + + Ok(()) + } + fn delete_leaf_helper( &mut self, page: PageImpl, diff --git a/src/tree_store/page_store/header.rs b/src/tree_store/page_store/header.rs index 40b67a88..351a2db2 100644 --- a/src/tree_store/page_store/header.rs +++ b/src/tree_store/page_store/header.rs @@ -5,7 +5,7 @@ use crate::tree_store::page_store::page_manager::{ xxh3_checksum, FILE_FORMAT_VERSION1, FILE_FORMAT_VERSION2, }; use crate::tree_store::{Checksum, PageNumber}; -use crate::{DatabaseError, StorageError}; +use crate::{DatabaseError, Result, StorageError}; use std::mem::size_of; // Database layout: @@ -58,6 +58,7 @@ pub(super) const DB_HEADER_SIZE: usize = TRANSACTION_1_OFFSET + TRANSACTION_SIZE // God byte flags const PRIMARY_BIT: u8 = 1; const RECOVERY_REQUIRED: u8 = 2; +const TWO_PHASE_COMMIT: u8 = 4; // Structure of each commit slot const VERSION_OFFSET: usize = 0; @@ -95,6 +96,7 @@ pub(super) struct HeaderRepairInfo { pub(super) struct DatabaseHeader { primary_slot: usize, pub(super) recovery_required: bool, + pub(super) two_phase_commit: bool, page_size: u32, region_header_pages: u32, region_max_data_pages: u32, @@ -120,6 +122,7 @@ impl DatabaseHeader { Self { primary_slot: 0, recovery_required: true, + two_phase_commit: false, page_size: layout.full_region_layout().page_size(), region_header_pages: layout.full_region_layout().get_header_pages(), region_max_data_pages: layout.full_region_layout().num_pages(), @@ -194,12 +197,57 @@ impl DatabaseHeader { self.primary_slot ^= 1; } + // Figure out which slot to use as the primary when starting a repair. The repair process might + // still switch to the other slot later, if the tree checksums turn out to be invalid. + // + // Returns true if we picked the original primary, or false if we swapped + pub(super) fn pick_primary_for_repair( + &mut self, + repair_info: HeaderRepairInfo, + ) -> Result { + // If the primary was written using 2-phase commit, it's guaranteed to be valid. Don't look + // at the secondary; even if it happens to have a valid checksum, Durability::Paranoid means + // we can't trust it + if self.two_phase_commit { + if repair_info.primary_corrupted { + return Err(StorageError::Corrupted( + "Primary is corrupted despite 2-phase commit".to_string(), + )); + } + return Ok(true); + } + + // Pick whichever slot is newer, assuming it has a valid checksum. This handles an edge case + // where we crash during fsync(), and the only data that got written to disk was the god byte + // update swapping the primary -- in that case, the primary contains a valid but out-of-date + // transaction, so we need to load from the secondary instead + if repair_info.primary_corrupted { + if repair_info.secondary_corrupted { + return Err(StorageError::Corrupted( + "Both commit slots are corrupted".to_string(), + )); + } + self.swap_primary_slot(); + return Ok(false); + } + + let secondary_newer = + self.secondary_slot().transaction_id > self.primary_slot().transaction_id; + if secondary_newer && !repair_info.secondary_corrupted { + self.swap_primary_slot(); + return Ok(false); + } + + Ok(true) + } + // TODO: consider returning an Err with the repair info pub(super) fn from_bytes(data: &[u8]) -> Result<(Self, HeaderRepairInfo), DatabaseError> { let invalid_magic_number = data[..MAGICNUMBER.len()] != MAGICNUMBER; let primary_slot = usize::from(data[GOD_BYTE_OFFSET] & PRIMARY_BIT != 0); let recovery_required = (data[GOD_BYTE_OFFSET] & RECOVERY_REQUIRED) != 0; + let two_phase_commit = (data[GOD_BYTE_OFFSET] & TWO_PHASE_COMMIT) != 0; let page_size = get_u32(&data[PAGE_SIZE_OFFSET..]); let region_header_pages = get_u32(&data[REGION_HEADER_PAGES_OFFSET..]); let region_max_data_pages = get_u32(&data[REGION_MAX_DATA_PAGES_OFFSET..]); @@ -226,6 +274,7 @@ impl DatabaseHeader { let result = Self { primary_slot, recovery_required, + two_phase_commit, page_size, region_header_pages, region_max_data_pages, @@ -242,22 +291,18 @@ impl DatabaseHeader { Ok((result, repair)) } - pub(super) fn to_bytes( - &self, - include_magic_number: bool, - swap_primary: bool, - ) -> [u8; DB_HEADER_SIZE] { + pub(super) fn to_bytes(&self, include_magic_number: bool) -> [u8; DB_HEADER_SIZE] { let mut result = [0; DB_HEADER_SIZE]; if include_magic_number { result[..MAGICNUMBER.len()].copy_from_slice(&MAGICNUMBER); } result[GOD_BYTE_OFFSET] = self.primary_slot.try_into().unwrap(); - if swap_primary { - result[GOD_BYTE_OFFSET] ^= PRIMARY_BIT; - } if self.recovery_required { result[GOD_BYTE_OFFSET] |= RECOVERY_REQUIRED; } + if self.two_phase_commit { + result[GOD_BYTE_OFFSET] |= TWO_PHASE_COMMIT; + } result[PAGE_SIZE_OFFSET..(PAGE_SIZE_OFFSET + size_of::())] .copy_from_slice(&self.page_size.to_le_bytes()); result[REGION_HEADER_PAGES_OFFSET..(REGION_HEADER_PAGES_OFFSET + size_of::())] diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 04f29aa6..51e0b0aa 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -1,4 +1,5 @@ use crate::transaction_tracker::TransactionId; +use crate::transactions::{AllocatorStateKey, AllocatorStateTree}; use crate::tree_store::btree_base::{BtreeHeader, Checksum}; use crate::tree_store::page_store::base::{PageHint, MAX_PAGE_INDEX}; use crate::tree_store::page_store::buddy_allocator::BuddyAllocator; @@ -11,7 +12,7 @@ use crate::tree_store::{Page, PageNumber}; use crate::StorageBackend; use crate::{DatabaseError, Result, StorageError}; #[cfg(feature = "logging")] -use log::warn; +use log::{debug, warn}; use std::cmp::{max, min}; #[cfg(debug_assertions)] use std::collections::HashMap; @@ -189,10 +190,11 @@ impl TransactionalMemory { ); header.recovery_required = false; + header.two_phase_commit = true; storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() - .copy_from_slice(&header.to_bytes(false, false)); + .copy_from_slice(&header.to_bytes(false)); allocators.flush_to(tracker_page, layout, &storage)?; storage.flush(false)?; @@ -201,7 +203,7 @@ impl TransactionalMemory { storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() - .copy_from_slice(&header.to_bytes(true, false)); + .copy_from_slice(&header.to_bytes(true)); storage.flush(false)?; } let header_bytes = storage.read_direct(0, DB_HEADER_SIZE)?; @@ -221,23 +223,12 @@ impl TransactionalMemory { region_max_pages, page_size.try_into().unwrap(), )); - if repair_info.primary_corrupted { - header.swap_primary_slot(); - } else { - // If the secondary is a valid commit, verify that the primary is newer. This handles an edge case where: - // * the primary bit is flipped to the secondary - // * a crash occurs during fsync, such that no other data is written out to the secondary. meaning that it contains a valid, but out of date transaction - let secondary_newer = - header.secondary_slot().transaction_id > header.primary_slot().transaction_id; - if secondary_newer && !repair_info.secondary_corrupted { - header.swap_primary_slot(); - } - } + header.pick_primary_for_repair(repair_info)?; assert!(!repair_info.invalid_magic_number); storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() - .copy_from_slice(&header.to_bytes(true, false)); + .copy_from_slice(&header.to_bytes(true)); storage.flush(false)?; } @@ -296,19 +287,8 @@ impl TransactionalMemory { // TODO: Also we should recheck the layout let mut was_clean = true; if header.recovery_required { - if repair_info.primary_corrupted { - header.swap_primary_slot(); + if !header.pick_primary_for_repair(repair_info)? { was_clean = false; - } else { - // If the secondary is a valid commit, verify that the primary is newer. This handles an edge case where: - // * the primary bit is flipped to the secondary - // * a crash occurs during fsync, such that no other data is written out to the secondary. meaning that it contains a valid, but out of date transaction - let secondary_newer = - header.secondary_slot().transaction_id > header.primary_slot().transaction_id; - if secondary_newer && !repair_info.secondary_corrupted { - header.swap_primary_slot(); - was_clean = false; - } } if repair_info.invalid_magic_number { return Err(StorageError::Corrupted("Invalid magic number".to_string()).into()); @@ -316,7 +296,7 @@ impl TransactionalMemory { self.storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() - .copy_from_slice(&header.to_bytes(true, false)); + .copy_from_slice(&header.to_bytes(true)); self.storage.flush(false)?; } @@ -331,7 +311,7 @@ impl TransactionalMemory { let mut state = self.state.lock().unwrap(); assert!(!state.header.recovery_required); state.header.recovery_required = true; - self.write_header(&state.header, false)?; + self.write_header(&state.header)?; self.storage.flush(false) } @@ -339,6 +319,10 @@ impl TransactionalMemory { Ok(self.state.lock().unwrap().header.recovery_required) } + pub(crate) fn used_two_phase_commit(&self) -> bool { + self.state.lock().unwrap().header.two_phase_commit + } + pub(crate) fn allocator_hash(&self) -> u128 { self.state.lock().unwrap().allocators.xxh3_hash() } @@ -367,11 +351,11 @@ impl TransactionalMemory { allocator.record_alloc(page_number.page_index, page_number.page_order); } - fn write_header(&self, header: &DatabaseHeader, swap_primary: bool) -> Result { + fn write_header(&self, header: &DatabaseHeader) -> Result { self.storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() - .copy_from_slice(&header.to_bytes(true, swap_primary)); + .copy_from_slice(&header.to_bytes(true)); Ok(()) } @@ -391,7 +375,7 @@ impl TransactionalMemory { let mut state = self.state.lock().unwrap(); state.header.set_region_tracker(new_tracker_page); - self.write_header(&state.header, false)?; + self.write_header(&state.header)?; self.storage.flush(false)?; } else { allocator.record_alloc(tracker_page.page_index, tracker_page.page_order); @@ -405,13 +389,137 @@ impl TransactionalMemory { .flush_to(tracker_page, state.header.layout(), &self.storage)?; state.header.recovery_required = false; - self.write_header(&state.header, false)?; + self.write_header(&state.header)?; let result = self.storage.flush(false); self.needs_recovery.store(false, Ordering::Release); result } + pub(crate) fn reserve_allocator_state( + &self, + tree: &mut AllocatorStateTree, + transaction_id: TransactionId, + ) -> Result { + let state = self.state.lock().unwrap(); + let layout = state.header.layout(); + let num_regions = layout.num_regions(); + let region_header_len = layout.full_region_layout().get_header_pages() + * layout.full_region_layout().page_size(); + let region_tracker_len = state.allocators.region_tracker.to_vec().len(); + drop(state); + + for i in 0..num_regions { + tree.insert( + &AllocatorStateKey::Region(i), + &vec![0; region_header_len as usize].as_ref(), + )?; + } + + tree.insert( + &AllocatorStateKey::RegionTracker, + &vec![0; region_tracker_len].as_ref(), + )?; + + tree.insert( + &AllocatorStateKey::TransactionId, + &transaction_id.raw_id().to_le_bytes().as_ref(), + )?; + + Ok(num_regions) + } + + // Returns true on success, or false if the number of regions has changed + pub(crate) fn try_save_allocator_state( + &self, + tree: &mut AllocatorStateTree, + num_regions: u32, + ) -> Result { + // Has the number of regions changed since reserve_allocator_state() was called? + if num_regions != self.state.lock().unwrap().header.layout().num_regions() { + return Ok(false); + } + + for i in 0..num_regions { + let region_bytes = + &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec(); + tree.insert_inplace(&AllocatorStateKey::Region(i), ®ion_bytes.as_ref())?; + } + + let region_tracker_bytes = self + .state + .lock() + .unwrap() + .allocators + .region_tracker + .to_vec(); + tree.insert_inplace( + &AllocatorStateKey::RegionTracker, + ®ion_tracker_bytes.as_ref(), + )?; + + Ok(true) + } + + // Returns true on success, or false if the allocator state was stale (in which case we need + // to fall back to a full repair) + pub(crate) fn try_load_allocator_state(&self, tree: &AllocatorStateTree) -> Result { + // See if this is stale allocator state left over from a previous transaction. That won't + // happen during normal operation, since WriteTransaction::commit() always updates the + // allocator state table before calling TransactionalMemory::commit(), but there are also + // a few places where TransactionalMemory::commit() is called directly without using a + // WriteTransaction. When that happens, any existing allocator state table will be left + // in place but is no longer valid. (And even if there were no such calls today, it would + // be an easy mistake to make! So it's good that we check.) + let transaction_id = TransactionId::new(u64::from_le_bytes( + tree.get(&AllocatorStateKey::TransactionId)? + .unwrap() + .value() + .try_into() + .unwrap(), + )); + if transaction_id != self.get_last_committed_transaction_id()? { + #[cfg(feature = "logging")] + debug!("Ignoring stale allocator state from {:?}", transaction_id); + return Ok(false); + } + + // Load the allocator state + let mut region_allocators = vec![]; + for region in + tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))? + { + region_allocators.push(BuddyAllocator::from_bytes(region?.value())); + } + + let region_tracker = RegionTracker::from_page( + tree.get(&AllocatorStateKey::RegionTracker)? + .unwrap() + .value(), + ); + + let mut state = self.state.lock().unwrap(); + state.allocators = Allocators { + region_tracker, + region_allocators, + }; + + // Resize the allocators to match the current file size + let layout = state.header.layout(); + state.allocators.resize_to(layout); + drop(state); + + // Allocate a larger region tracker page if necessary. This also happens automatically on + // shutdown, but we do it here because we want our allocator state to exactly match the + // result of running a full repair + self.ensure_region_tracker_page()?; + + self.state.lock().unwrap().header.recovery_required = false; + self.needs_recovery.store(false, Ordering::Release); + + Ok(true) + } + pub(crate) fn is_allocated(&self, page: PageNumber) -> bool { let state = self.state.lock().unwrap(); let allocator = state.get_region(page.region); @@ -419,6 +527,27 @@ impl TransactionalMemory { allocator.is_allocated(page.page_index, page.page_order) } + // Make sure we have a large enough region-tracker page. This uses allocate_non_transactional(), + // so it should only be called from outside a transaction + fn ensure_region_tracker_page(&self) -> Result { + let state = self.state.lock().unwrap(); + let tracker_len = state.allocators.region_tracker.to_vec().len(); + let mut tracker_page = state.header.region_tracker(); + drop(state); + + if tracker_page.page_size_bytes(self.page_size) < (tracker_len as u64) { + // Allocate a larger tracker page + self.free(tracker_page); + tracker_page = self + .allocate_non_transactional(tracker_len)? + .get_page_number(); + let mut state = self.state.lock().unwrap(); + state.header.set_region_tracker(tracker_page); + } + + Ok(()) + } + // Relocates the region tracker to a lower page, if possible // Returns true if the page was moved pub(crate) fn relocate_region_tracker(&self) -> Result { @@ -430,6 +559,9 @@ impl TransactionalMemory { let old_tracker_page = state.header.region_tracker(); // allocate acquires this lock, so we need to drop it drop(state); + // Allocate the new page. Unlike other region-tracker allocations, this happens inside + // a transaction, so we use an ordinary allocation (which gets committed or rolled back + // along with the rest of the transaction) rather than allocate_non_transactional() let new_page = self.allocate_lowest(region_tracker_size.try_into().unwrap())?; if new_page.get_page_number().is_before(old_tracker_page) { let mut state = self.state.lock().unwrap(); @@ -466,7 +598,6 @@ impl TransactionalMemory { transaction_id: TransactionId, eventual: bool, two_phase: bool, - allow_trim: bool, ) -> Result { let result = self.commit_inner( data_root, @@ -475,7 +606,6 @@ impl TransactionalMemory { transaction_id, eventual, two_phase, - allow_trim, ); if result.is_err() { self.needs_recovery.store(true, Ordering::Release); @@ -492,7 +622,6 @@ impl TransactionalMemory { transaction_id: TransactionId, eventual: bool, two_phase: bool, - allow_trim: bool, ) -> Result { // All mutable pages must be dropped, this ensures that when a transaction completes // no more writes can happen to the pages it allocated. Thus it is safe to make them visible @@ -503,11 +632,7 @@ impl TransactionalMemory { let mut state = self.state.lock().unwrap(); // Trim surplus file space, before finalizing the commit - let shrunk = if allow_trim { - Self::try_shrink(&mut state)? - } else { - false - }; + let shrunk = Self::try_shrink(&mut state)?; // Copy the header so that we can release the state lock, while we flush the file let mut header = state.header.clone(); drop(state); @@ -519,18 +644,21 @@ impl TransactionalMemory { secondary.system_root = system_root; secondary.freed_root = freed_root; - self.write_header(&header, false)?; + self.write_header(&header)?; // Use 2-phase commit, if checksums are disabled if two_phase { self.storage.flush(eventual)?; } - // Swap the primary bit on-disk - self.write_header(&header, true)?; - self.storage.flush(eventual)?; - // Only swap the in-memory primary bit after the fsync is successful + // Make our new commit the primary, and record whether it was a 2-phase commit. + // These two bits need to be written atomically header.swap_primary_slot(); + header.two_phase_commit = two_phase; + + // Write the new header to disk + self.write_header(&header)?; + self.storage.flush(eventual)?; if shrunk { let result = self.storage.resize(header.layout().len()); @@ -803,7 +931,12 @@ impl TransactionalMemory { self.allocated_since_commit.lock().unwrap().contains(&page) } - pub(crate) fn allocate_helper(&self, allocation_size: usize, lowest: bool) -> Result { + pub(crate) fn allocate_helper( + &self, + allocation_size: usize, + lowest: bool, + transactional: bool, + ) -> Result { let required_pages = (allocation_size + self.get_page_size() - 1) / self.get_page_size(); let required_order = ceil_log2(required_pages); @@ -831,10 +964,12 @@ impl TransactionalMemory { assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number)); } - self.allocated_since_commit - .lock() - .unwrap() - .insert(page_number); + if transactional { + self.allocated_since_commit + .lock() + .unwrap() + .insert(page_number); + } let address_range = page_number.address_range( self.page_size.into(), @@ -966,11 +1101,17 @@ impl TransactionalMemory { } pub(crate) fn allocate(&self, allocation_size: usize) -> Result { - self.allocate_helper(allocation_size, false) + self.allocate_helper(allocation_size, false, true) } pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result { - self.allocate_helper(allocation_size, true) + self.allocate_helper(allocation_size, true, true) + } + + // Allocate a page not associated with any transaction. The page is immediately considered committed, + // and won't be rolled back if an abort happens. This is only used for the region tracker + fn allocate_non_transactional(&self, allocation_size: usize) -> Result { + self.allocate_helper(allocation_size, false, false) } pub(crate) fn count_allocated_pages(&self) -> Result { @@ -1010,7 +1151,6 @@ impl Drop for TransactionalMemory { non_durable_transaction_id, false, true, - true, ) .is_err() { @@ -1024,24 +1164,15 @@ impl Drop for TransactionalMemory { warn!("Failure while finalizing non-durable commit. Database may have rolled back"); } } - let mut state = self.state.lock().unwrap(); - let tracker_len = state.allocators.region_tracker.to_vec().len(); - let tracker_page = state.header.region_tracker(); - if tracker_page.page_size_bytes(self.page_size) < (tracker_len as u64) { - drop(state); - // Allocate a larger tracker page - self.free(tracker_page); - if let Ok(tracker_page) = self.allocate(tracker_len) { - state = self.state.lock().unwrap(); - state - .header - .set_region_tracker(tracker_page.get_page_number()); - } else { - #[cfg(feature = "logging")] - warn!("Failure while flushing allocator state. Repair required at restart."); - return; - } + + // Allocate a larger region-tracker page if necessary + if self.ensure_region_tracker_page().is_err() { + #[cfg(feature = "logging")] + warn!("Failure while flushing allocator state. Repair required at restart."); + return; } + + let mut state = self.state.lock().unwrap(); if state .allocators .flush_to( @@ -1058,7 +1189,7 @@ impl Drop for TransactionalMemory { if self.storage.flush(false).is_ok() && !self.needs_recovery.load(Ordering::Acquire) { state.header.recovery_required = false; - let _ = self.write_header(&state.header, false); + let _ = self.write_header(&state.header); let _ = self.storage.flush(false); } } diff --git a/src/tree_store/table_tree.rs b/src/tree_store/table_tree.rs index fc964563..0d6d26a1 100644 --- a/src/tree_store/table_tree.rs +++ b/src/tree_store/table_tree.rs @@ -389,7 +389,7 @@ impl<'txn> TableTreeMut<'txn> { Ok(true) } - pub(crate) fn flush_table_root_updates(&mut self) -> Result> { + pub(crate) fn flush_table_root_updates(&mut self) -> Result<&mut Self> { for (name, (new_root, new_length)) in self.pending_table_updates.drain() { // Bypass .get_table() since the table types are dynamic let mut definition = self.tree.get(&name.as_str())?.unwrap().value(); @@ -418,8 +418,7 @@ impl<'txn> TableTreeMut<'txn> { fixed_key_size, fixed_value_size, ); - tree.finalize_dirty_checksums()?; - *table_root = tree.get_root(); + *table_root = tree.finalize_dirty_checksums()?; *table_length = new_length; } InternalTableDefinition::Multimap { @@ -440,8 +439,50 @@ impl<'txn> TableTreeMut<'txn> { } self.tree.insert(&name.as_str(), &definition)?; } - self.tree.finalize_dirty_checksums()?; - Ok(self.tree.get_root()) + Ok(self) + } + + // Creates a new table, calls the provided closure to insert entries into it, and then + // flushes the table root. The flush is done using insert_inplace(), so it's guaranteed + // that no pages will be allocated or freed after the closure returns + pub(crate) fn create_table_and_flush_table_root( + &mut self, + name: &str, + f: impl FnOnce(&mut BtreeMut) -> Result, + ) -> Result { + assert!(self.pending_table_updates.is_empty()); + assert!(self.tree.get(&name)?.is_none()); + + // Reserve space in the table tree + self.tree.insert( + &name, + &InternalTableDefinition::new::(TableType::Normal, None, 0), + )?; + + // Create an empty table and call the provided closure on it + let mut tree: BtreeMut = BtreeMut::new( + None, + self.guard.clone(), + self.mem.clone(), + self.freed_pages.clone(), + ); + f(&mut tree)?; + + // Finalize the table's checksums + let table_root = tree.finalize_dirty_checksums()?; + let table_length = tree.get_root().map(|x| x.length).unwrap_or_default(); + + // Flush the root to the table tree, without allocating + self.tree.insert_inplace( + &name, + &InternalTableDefinition::new::(TableType::Normal, table_root, table_length), + )?; + + Ok(()) + } + + pub(crate) fn finalize_dirty_checksums(&mut self) -> Result> { + self.tree.finalize_dirty_checksums() } // root_page: the root of the master table diff --git a/src/tree_store/table_tree_base.rs b/src/tree_store/table_tree_base.rs index b3f3dbd1..e2bae031 100644 --- a/src/tree_store/table_tree_base.rs +++ b/src/tree_store/table_tree_base.rs @@ -441,6 +441,10 @@ impl Value for InternalTableDefinition { } } + // Be careful if you change this serialization format! The InternalTableDefinition for + // a given table needs to have a consistent serialized length, regardless of the table + // contents, so that create_table_and_flush_table_root() can update the allocator state + // table without causing more allocations fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Vec where Self: 'b,