From 6a1cd05a4db42a10c2255d3ef6c6678e53a9ac7d Mon Sep 17 00:00:00 2001 From: Michael Constant Date: Sat, 26 Oct 2024 01:54:53 -0700 Subject: [PATCH 1/6] Remove swap_primary, which is no longer needed Now that commit() has its own in-memory copy of the database header, there's no need for the special-case swap_primary flag --- src/tree_store/page_store/header.rs | 9 +------ src/tree_store/page_store/page_manager.rs | 31 ++++++++++++----------- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/src/tree_store/page_store/header.rs b/src/tree_store/page_store/header.rs index 40b67a88..81820fd2 100644 --- a/src/tree_store/page_store/header.rs +++ b/src/tree_store/page_store/header.rs @@ -242,19 +242,12 @@ impl DatabaseHeader { Ok((result, repair)) } - pub(super) fn to_bytes( - &self, - include_magic_number: bool, - swap_primary: bool, - ) -> [u8; DB_HEADER_SIZE] { + pub(super) fn to_bytes(&self, include_magic_number: bool) -> [u8; DB_HEADER_SIZE] { let mut result = [0; DB_HEADER_SIZE]; if include_magic_number { result[..MAGICNUMBER.len()].copy_from_slice(&MAGICNUMBER); } result[GOD_BYTE_OFFSET] = self.primary_slot.try_into().unwrap(); - if swap_primary { - result[GOD_BYTE_OFFSET] ^= PRIMARY_BIT; - } if self.recovery_required { result[GOD_BYTE_OFFSET] |= RECOVERY_REQUIRED; } diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 04f29aa6..61557af6 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -192,7 +192,7 @@ impl TransactionalMemory { storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() - .copy_from_slice(&header.to_bytes(false, false)); + .copy_from_slice(&header.to_bytes(false)); allocators.flush_to(tracker_page, layout, &storage)?; storage.flush(false)?; @@ -201,7 +201,7 @@ impl TransactionalMemory { storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() - .copy_from_slice(&header.to_bytes(true, false)); + .copy_from_slice(&header.to_bytes(true)); storage.flush(false)?; } let header_bytes = storage.read_direct(0, DB_HEADER_SIZE)?; @@ -237,7 +237,7 @@ impl TransactionalMemory { storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() - .copy_from_slice(&header.to_bytes(true, false)); + .copy_from_slice(&header.to_bytes(true)); storage.flush(false)?; } @@ -316,7 +316,7 @@ impl TransactionalMemory { self.storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() - .copy_from_slice(&header.to_bytes(true, false)); + .copy_from_slice(&header.to_bytes(true)); self.storage.flush(false)?; } @@ -331,7 +331,7 @@ impl TransactionalMemory { let mut state = self.state.lock().unwrap(); assert!(!state.header.recovery_required); state.header.recovery_required = true; - self.write_header(&state.header, false)?; + self.write_header(&state.header)?; self.storage.flush(false) } @@ -367,11 +367,11 @@ impl TransactionalMemory { allocator.record_alloc(page_number.page_index, page_number.page_order); } - fn write_header(&self, header: &DatabaseHeader, swap_primary: bool) -> Result { + fn write_header(&self, header: &DatabaseHeader) -> Result { self.storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() - .copy_from_slice(&header.to_bytes(true, swap_primary)); + .copy_from_slice(&header.to_bytes(true)); Ok(()) } @@ -391,7 +391,7 @@ impl TransactionalMemory { let mut state = self.state.lock().unwrap(); state.header.set_region_tracker(new_tracker_page); - self.write_header(&state.header, false)?; + self.write_header(&state.header)?; self.storage.flush(false)?; } else { allocator.record_alloc(tracker_page.page_index, tracker_page.page_order); @@ -405,7 +405,7 @@ impl TransactionalMemory { .flush_to(tracker_page, state.header.layout(), &self.storage)?; state.header.recovery_required = false; - self.write_header(&state.header, false)?; + self.write_header(&state.header)?; let result = self.storage.flush(false); self.needs_recovery.store(false, Ordering::Release); @@ -519,19 +519,20 @@ impl TransactionalMemory { secondary.system_root = system_root; secondary.freed_root = freed_root; - self.write_header(&header, false)?; + self.write_header(&header)?; // Use 2-phase commit, if checksums are disabled if two_phase { self.storage.flush(eventual)?; } - // Swap the primary bit on-disk - self.write_header(&header, true)?; - self.storage.flush(eventual)?; - // Only swap the in-memory primary bit after the fsync is successful + // Make our new commit the primary header.swap_primary_slot(); + // Write the new header to disk + self.write_header(&header)?; + self.storage.flush(eventual)?; + if shrunk { let result = self.storage.resize(header.layout().len()); if result.is_err() { @@ -1058,7 +1059,7 @@ impl Drop for TransactionalMemory { if self.storage.flush(false).is_ok() && !self.needs_recovery.load(Ordering::Acquire) { state.header.recovery_required = false; - let _ = self.write_header(&state.header, false); + let _ = self.write_header(&state.header); let _ = self.storage.flush(false); } } From a7ea436f1c2b0dbaef3788199474b0253bd83d0a Mon Sep 17 00:00:00 2001 From: Michael Constant Date: Wed, 6 Nov 2024 18:00:09 -0800 Subject: [PATCH 2/6] Move commit() out of do_repair() Instead of actually making a commit, do_repair() should just return the new roots that would be committed. This makes check_integrity() much faster, since it no longer needs to verify the checksums twice, and it means commit() can always trim the database without worrying about whether it'll change the allocator hash. It also allows check_integrity() to avoid making an unnecessary commit if the database is clean, which is nice with quick-repair. It means a successful check_integrity() won't invalidate the allocator state table, so you can still do a quick-repair afterwards --- src/db.rs | 54 ++++++++++++++--------- src/transactions.rs | 1 - src/tree_store/page_store/page_manager.rs | 10 +---- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/src/db.rs b/src/db.rs index 1ff1ffde..686184aa 100644 --- a/src/db.rs +++ b/src/db.rs @@ -386,17 +386,34 @@ impl Database { .unwrap() .clear_cache_and_reload()?; - if !Self::verify_primary_checksums(self.mem.clone())? { - was_clean = false; - } + let old_roots = [ + self.mem.get_data_root(), + self.mem.get_system_root(), + self.mem.get_freed_root(), + ]; - Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err { + let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err { DatabaseError::Storage(storage_err) => storage_err, _ => unreachable!(), })?; - if allocator_hash != self.mem.allocator_hash() { + + if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() { was_clean = false; } + + if !was_clean { + let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next(); + let [data_root, system_root, freed_root] = new_roots; + self.mem.commit( + data_root, + system_root, + freed_root, + next_transaction_id, + false, + true, + )?; + } + self.mem.begin_writable()?; Ok(was_clean) @@ -592,7 +609,7 @@ impl Database { fn do_repair( mem: &mut Arc, // Only &mut to ensure exclusivity repair_callback: &(dyn Fn(&mut RepairSession) + 'static), - ) -> Result<(), DatabaseError> { + ) -> Result<[Option; 3], DatabaseError> { if !Self::verify_primary_checksums(mem.clone())? { // 0.3 because the repair takes 3 full scans and the first is done now let mut handle = RepairSession::new(0.3); @@ -662,19 +679,7 @@ impl Database { // by storing an empty root during the below commit() mem.clear_read_cache(); - let transaction_id = mem.get_last_committed_transaction_id()?.next(); - mem.commit( - data_root, - system_root, - freed_root, - transaction_id, - false, - true, - // don't trim the database file, because we want the allocator hash to match exactly - false, - )?; - - Ok(()) + Ok([data_root, system_root, freed_root]) } fn new( @@ -705,7 +710,16 @@ impl Database { if handle.aborted() { return Err(DatabaseError::RepairAborted); } - Self::do_repair(&mut mem, repair_callback)?; + let [data_root, system_root, freed_root] = Self::do_repair(&mut mem, repair_callback)?; + let next_transaction_id = mem.get_last_committed_transaction_id()?.next(); + mem.commit( + data_root, + system_root, + freed_root, + next_transaction_id, + false, + true, + )?; } mem.begin_writable()?; diff --git a/src/transactions.rs b/src/transactions.rs index 732cbfc1..88f8bc18 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -1103,7 +1103,6 @@ impl WriteTransaction { self.transaction_id, eventual, two_phase, - true, )?; // Mark any pending non-durable commits as fully committed. diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 61557af6..80fd6fab 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -466,7 +466,6 @@ impl TransactionalMemory { transaction_id: TransactionId, eventual: bool, two_phase: bool, - allow_trim: bool, ) -> Result { let result = self.commit_inner( data_root, @@ -475,7 +474,6 @@ impl TransactionalMemory { transaction_id, eventual, two_phase, - allow_trim, ); if result.is_err() { self.needs_recovery.store(true, Ordering::Release); @@ -492,7 +490,6 @@ impl TransactionalMemory { transaction_id: TransactionId, eventual: bool, two_phase: bool, - allow_trim: bool, ) -> Result { // All mutable pages must be dropped, this ensures that when a transaction completes // no more writes can happen to the pages it allocated. Thus it is safe to make them visible @@ -503,11 +500,7 @@ impl TransactionalMemory { let mut state = self.state.lock().unwrap(); // Trim surplus file space, before finalizing the commit - let shrunk = if allow_trim { - Self::try_shrink(&mut state)? - } else { - false - }; + let shrunk = Self::try_shrink(&mut state)?; // Copy the header so that we can release the state lock, while we flush the file let mut header = state.header.clone(); drop(state); @@ -1011,7 +1004,6 @@ impl Drop for TransactionalMemory { non_durable_transaction_id, false, true, - true, ) .is_err() { From d4373cc920b10a72a1f4104d65636d3023e11eca Mon Sep 17 00:00:00 2001 From: Michael Constant Date: Thu, 24 Oct 2024 17:26:07 -0700 Subject: [PATCH 3/6] Fix typos in header layout docs and Durability descriptions --- docs/design.md | 2 +- src/transactions.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/design.md b/docs/design.md index b584d7ee..882beb6c 100644 --- a/docs/design.md +++ b/docs/design.md @@ -31,8 +31,8 @@ database file. | magic number | | magic con.| god byte | padding | page size | | region header pages | region max data pages | -| region tracker page number | | number of full regions | data pages in trailing region | +| region tracker page number | | padding | | padding | | padding | diff --git a/src/transactions.rs b/src/transactions.rs index 88f8bc18..77a2dc8f 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -167,9 +167,9 @@ pub enum Durability { /// the ability to cause the database process to crash, can cause invalid data to be written /// with a valid checksum, leaving the database in an invalid, attacker-controlled state. Immediate, - /// Commits with this durability level have the same gaurantees as [`Durability::Immediate`] + /// Commits with this durability level have the same guarantees as [`Durability::Immediate`] /// - /// Additionally, aata is written with the following 2-phase commit algorithm: + /// Additionally, data is written with the following 2-phase commit algorithm: /// /// 1. Update the inactive commit slot with the new database state /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted From ee310a84b5b61f9128133245af27548622aff2f7 Mon Sep 17 00:00:00 2001 From: Michael Constant Date: Fri, 15 Nov 2024 01:39:44 -0800 Subject: [PATCH 4/6] Add set_two_phase_commit(), deprecate Durability::Paranoid --- src/db.rs | 26 ++++----- src/transactions.rs | 136 +++++++++++++++++++++++++------------------- 2 files changed, 91 insertions(+), 71 deletions(-) diff --git a/src/db.rs b/src/db.rs index 686184aa..e2ba23fa 100644 --- a/src/db.rs +++ b/src/db.rs @@ -5,9 +5,7 @@ use crate::tree_store::{ TableType, TransactionalMemory, PAGE_SIZE, }; use crate::types::{Key, Value}; -use crate::{ - CompactionError, DatabaseError, Durability, ReadOnlyTable, SavepointError, StorageError, -}; +use crate::{CompactionError, DatabaseError, ReadOnlyTable, SavepointError, StorageError}; use crate::{ReadTransaction, Result, WriteTransaction}; use std::fmt::{Debug, Display, Formatter}; @@ -439,11 +437,11 @@ impl Database { if self.transaction_tracker.any_savepoint_exists() { return Err(CompactionError::EphemeralSavepointExists); } - txn.set_durability(Durability::Paranoid); + txn.set_two_phase_commit(true); txn.commit().map_err(|e| e.into_storage_error())?; // Repeat, just in case executing list_persistent_savepoints() created a new table let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?; - txn.set_durability(Durability::Paranoid); + txn.set_two_phase_commit(true); txn.commit().map_err(|e| e.into_storage_error())?; // There can't be any outstanding transactions because we have a `&mut self`, so all pending free pages // should have been cleared out by the above commit() @@ -464,7 +462,7 @@ impl Database { // Double commit to free up the relocated pages for reuse let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?; - txn.set_durability(Durability::Paranoid); + txn.set_two_phase_commit(true); txn.commit().map_err(|e| e.into_storage_error())?; assert!(self.mem.get_freed_root().is_none()); @@ -1176,7 +1174,7 @@ mod test { let table_def: TableDefinition = TableDefinition::new("x"); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint0 = tx.ephemeral_savepoint().unwrap(); { tx.open_table(table_def).unwrap(); @@ -1184,7 +1182,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint1 = tx.ephemeral_savepoint().unwrap(); tx.restore_savepoint(&savepoint0).unwrap(); tx.set_durability(Durability::None); @@ -1196,7 +1194,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); tx.restore_savepoint(&savepoint0).unwrap(); { tx.open_table(table_def).unwrap(); @@ -1204,7 +1202,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint2 = tx.ephemeral_savepoint().unwrap(); drop(savepoint0); tx.restore_savepoint(&savepoint2).unwrap(); @@ -1217,7 +1215,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint3 = tx.ephemeral_savepoint().unwrap(); drop(savepoint1); tx.restore_savepoint(&savepoint3).unwrap(); @@ -1227,7 +1225,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint4 = tx.ephemeral_savepoint().unwrap(); drop(savepoint2); tx.restore_savepoint(&savepoint3).unwrap(); @@ -1239,7 +1237,7 @@ mod test { tx.abort().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); let savepoint5 = tx.ephemeral_savepoint().unwrap(); drop(savepoint3); assert!(tx.restore_savepoint(&savepoint4).is_err()); @@ -1249,7 +1247,7 @@ mod test { tx.commit().unwrap(); let mut tx = db.begin_write().unwrap(); - tx.set_durability(Durability::Paranoid); + tx.set_two_phase_commit(true); tx.restore_savepoint(&savepoint5).unwrap(); tx.set_durability(Durability::None); { diff --git a/src/transactions.rs b/src/transactions.rs index 77a2dc8f..2a6cc962 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -151,48 +151,22 @@ pub enum Durability { Eventual, /// Commits with this durability level are guaranteed to be persistent as soon as /// [`WriteTransaction::commit`] returns. - /// - /// Data is written with checksums, with the following commit algorithm: - /// - /// 1. Update the inactive commit slot with the new database state - /// 2. Flip the god byte primary bit to activate the newly updated commit slot - /// 3. Call `fsync` to ensure all writes have been persisted to disk - /// - /// When opening the database after a crash, the most recent of the two commit slots with a - /// valid checksum is used. - /// - /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash - /// function with close to perfect collision resistance when used with non-malicious input. An - /// attacker with an extremely high degree of control over the database's workload, including - /// the ability to cause the database process to crash, can cause invalid data to be written - /// with a valid checksum, leaving the database in an invalid, attacker-controlled state. Immediate, - /// Commits with this durability level have the same guarantees as [`Durability::Immediate`] - /// - /// Additionally, data is written with the following 2-phase commit algorithm: - /// - /// 1. Update the inactive commit slot with the new database state - /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted - /// 3. Flip the god byte primary bit to activate the newly updated commit slot - /// 4. Call `fsync` to ensure the write to the god byte has been persisted - /// - /// This mitigates a theoretical attack where an attacker who - /// 1. can control the order in which pages are flushed to disk - /// 2. can introduce crashes during `fsync`, - /// 3. has knowledge of the database file contents, and - /// 4. can include arbitrary data in a write transaction - /// - /// could cause a transaction to partially commit (some but not all of the data is written). - /// This is described in the design doc in futher detail. - /// - /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data - /// has been persisted to disk after calling `fsync`. Even with this commit level, an attacker - /// with a high degree of control over the database's workload, including the ability to cause - /// the database process to crash, can cause the database to crash with the god byte primary bit - /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-controlled state. + /// This is identical to `Durability::Immediate`, but also enables 2-phase commit. New code + /// should call `set_two_phase_commit(true)` directly instead. + #[deprecated(since = "2.3.0", note = "use set_two_phase_commit(true) instead")] Paranoid, } +// These are the actual durability levels used internally. `Durability::Paranoid` is translated +// to `InternalDurability::Immediate`, and also enables 2-phase commit +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +enum InternalDurability { + None, + Eventual, + Immediate, +} + // Like a Table but only one may be open at a time to avoid possible races pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> { name: String, @@ -447,7 +421,8 @@ pub struct WriteTransaction { system_tables: Mutex>, completed: bool, dirty: AtomicBool, - durability: Durability, + durability: InternalDurability, + two_phase_commit: bool, // Persistent savepoints created during this transaction created_persistent_savepoints: Mutex>, deleted_persistent_savepoints: Mutex>, @@ -504,7 +479,8 @@ impl WriteTransaction { post_commit_frees, completed: false, dirty: AtomicBool::new(false), - durability: Durability::Immediate, + durability: InternalDurability::Immediate, + two_phase_commit: false, created_persistent_savepoints: Mutex::new(Default::default()), deleted_persistent_savepoints: Mutex::new(vec![]), }) @@ -595,10 +571,7 @@ impl WriteTransaction { /// Returns `[SavepointError::InvalidSavepoint`], if the transaction is "dirty" (any tables have been opened) /// or if the transaction's durability is less than `[Durability::Immediate]` pub fn persistent_savepoint(&self) -> Result { - if !matches!( - self.durability, - Durability::Immediate | Durability::Paranoid - ) { + if self.durability != InternalDurability::Immediate { return Err(SavepointError::InvalidSavepoint); } @@ -659,10 +632,7 @@ impl WriteTransaction { /// Returns `true` if the savepoint existed /// Returns `[SavepointError::InvalidSavepoint`] if the transaction's durability is less than `[Durability::Immediate]` pub fn delete_persistent_savepoint(&self, id: u64) -> Result { - if !matches!( - self.durability, - Durability::Immediate | Durability::Paranoid - ) { + if self.durability != InternalDurability::Immediate { return Err(SavepointError::InvalidSavepoint); } let mut system_tables = self.system_tables.lock().unwrap(); @@ -904,7 +874,60 @@ impl WriteTransaction { .unwrap() .is_empty(); assert!(no_created && no_deleted); - self.durability = durability; + + self.durability = match durability { + Durability::None => InternalDurability::None, + Durability::Eventual => InternalDurability::Eventual, + Durability::Immediate => InternalDurability::Immediate, + #[allow(deprecated)] + Durability::Paranoid => { + self.set_two_phase_commit(true); + InternalDurability::Immediate + } + }; + } + + /// Enable or disable 2-phase commit (defaults to disabled) + /// + /// By default, data is written using the following 1-phase commit algorithm: + /// + /// 1. Update the inactive commit slot with the new database state + /// 2. Flip the god byte primary bit to activate the newly updated commit slot + /// 3. Call `fsync` to ensure all writes have been persisted to disk + /// + /// All data is written with checksums. When opening the database after a crash, the most + /// recent of the two commit slots with a valid checksum is used. + /// + /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash + /// function with close to perfect collision resistance when used with non-malicious input. An + /// attacker with an extremely high degree of control over the database's workload, including + /// the ability to cause the database process to crash, can cause invalid data to be written + /// with a valid checksum, leaving the database in an invalid, attacker-controlled state. + /// + /// Alternatively, you can enable 2-phase commit, which writes data like this: + /// + /// 1. Update the inactive commit slot with the new database state + /// 2. Call `fsync` to ensure the database slate and commit slot update have been persisted + /// 3. Flip the god byte primary bit to activate the newly updated commit slot + /// 4. Call `fsync` to ensure the write to the god byte has been persisted + /// + /// This mitigates a theoretical attack where an attacker who + /// 1. can control the order in which pages are flushed to disk + /// 2. can introduce crashes during `fsync`, + /// 3. has knowledge of the database file contents, and + /// 4. can include arbitrary data in a write transaction + /// + /// could cause a transaction to partially commit (some but not all of the data is written). + /// This is described in the design doc in futher detail. + /// + /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data + /// has been persisted to disk after calling `fsync`. Even with 2-phase commit, an attacker with + /// a high degree of control over the database's workload, including the ability to cause the + /// database process to crash, can cause the database to crash with the god byte primary bit + /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker- + /// controlled state. + pub fn set_two_phase_commit(&mut self, enabled: bool) { + self.two_phase_commit = enabled; } /// Open the given table @@ -1002,14 +1025,13 @@ impl WriteTransaction { fn commit_inner(&mut self) -> Result<(), CommitError> { #[cfg(feature = "logging")] debug!( - "Committing transaction id={:?} with durability={:?}", - self.transaction_id, self.durability + "Committing transaction id={:?} with durability={:?} two_phase={}", + self.transaction_id, self.durability, self.two_phase_commit ); match self.durability { - Durability::None => self.non_durable_commit()?, - Durability::Eventual => self.durable_commit(true, false)?, - Durability::Immediate => self.durable_commit(false, false)?, - Durability::Paranoid => self.durable_commit(false, true)?, + InternalDurability::None => self.non_durable_commit()?, + InternalDurability::Eventual => self.durable_commit(true)?, + InternalDurability::Immediate => self.durable_commit(false)?, } for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() { @@ -1062,7 +1084,7 @@ impl WriteTransaction { Ok(()) } - pub(crate) fn durable_commit(&mut self, eventual: bool, two_phase: bool) -> Result { + pub(crate) fn durable_commit(&mut self, eventual: bool) -> Result { let free_until_transaction = self .transaction_tracker .oldest_live_read_transaction() @@ -1102,7 +1124,7 @@ impl WriteTransaction { freed_root, self.transaction_id, eventual, - two_phase, + self.two_phase_commit, )?; // Mark any pending non-durable commits as fully committed. From bff1e43f8ca34f623b5b7abd6b01534bb2b3f1e8 Mon Sep 17 00:00:00 2001 From: Michael Constant Date: Thu, 24 Oct 2024 16:57:28 -0700 Subject: [PATCH 5/6] Move finalize_dirty_checksums() out of flush_table_root_updates() Refactoring in preparation for quick-repair, which needs to be able to call these separately --- src/multimap_table.rs | 8 ++++---- src/transactions.rs | 29 +++++++++++++++++------------ src/tree_store/btree.rs | 13 ++++++------- src/tree_store/table_tree.rs | 12 +++++++----- 4 files changed, 34 insertions(+), 28 deletions(-) diff --git a/src/multimap_table.rs b/src/multimap_table.rs index 5c24c3a3..ac42a7cc 100644 --- a/src/multimap_table.rs +++ b/src/multimap_table.rs @@ -308,8 +308,8 @@ pub(crate) fn finalize_tree_and_subtree_checksums( value_size, <()>::fixed_width(), ); - subtree.finalize_dirty_checksums()?; - sub_root_updates.push((i, entry.key().to_vec(), subtree.get_root().unwrap())); + let subtree_root = subtree.finalize_dirty_checksums()?.unwrap(); + sub_root_updates.push((i, entry.key().to_vec(), subtree_root)); } } } @@ -327,10 +327,10 @@ pub(crate) fn finalize_tree_and_subtree_checksums( Ok(()) })?; - tree.finalize_dirty_checksums()?; + let root = tree.finalize_dirty_checksums()?; // No pages should have been freed by this operation assert!(freed_pages.lock().unwrap().is_empty()); - Ok(tree.get_root()) + Ok(root) } fn parse_subtree_roots( diff --git a/src/transactions.rs b/src/transactions.rs index 2a6cc962..63c9e8e6 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -1095,14 +1095,16 @@ impl WriteTransaction { .lock() .unwrap() .table_tree - .flush_table_root_updates()?; + .flush_table_root_updates()? + .finalize_dirty_checksums()?; let system_root = self .system_tables .lock() .unwrap() .table_tree - .flush_table_root_updates()?; + .flush_table_root_updates()? + .finalize_dirty_checksums()?; self.process_freed_pages(free_until_transaction)?; // If a savepoint exists it might reference the freed-tree, since it holds a reference to the @@ -1113,10 +1115,7 @@ impl WriteTransaction { self.store_freed_pages(savepoint_exists)?; // Finalize freed table checksums, before doing the final commit - // user & system table trees were already finalized when we flushed the pending roots above - self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; - - let freed_root = self.freed_tree.lock().unwrap().get_root(); + let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; self.mem.commit( user_root, @@ -1146,23 +1145,23 @@ impl WriteTransaction { .lock() .unwrap() .table_tree - .flush_table_root_updates()?; + .flush_table_root_updates()? + .finalize_dirty_checksums()?; let system_root = self .system_tables .lock() .unwrap() .table_tree - .flush_table_root_updates()?; + .flush_table_root_updates()? + .finalize_dirty_checksums()?; // Store all freed pages for a future commit(), since we can't free pages during a // non-durable commit (it's non-durable, so could be rolled back anytime in the future) self.store_freed_pages(true)?; // Finalize all checksums, before doing the final commit - self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; - - let freed_root = self.freed_tree.lock().unwrap().get_root(); + let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; self.mem .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?; @@ -1350,7 +1349,13 @@ impl WriteTransaction { pub(crate) fn print_debug(&self) -> Result { // Flush any pending updates to make sure we get the latest root let mut tables = self.tables.lock().unwrap(); - if let Some(page) = tables.table_tree.flush_table_root_updates().unwrap() { + if let Some(page) = tables + .table_tree + .flush_table_root_updates() + .unwrap() + .finalize_dirty_checksums() + .unwrap() + { eprintln!("Master tree:"); let master_tree: Btree<&str, InternalTableDefinition> = Btree::new( Some(page), diff --git a/src/tree_store/btree.rs b/src/tree_store/btree.rs index 991bfad6..ac3bc784 100644 --- a/src/tree_store/btree.rs +++ b/src/tree_store/btree.rs @@ -152,7 +152,7 @@ impl UntypedBtreeMut { } // Recomputes the checksum for all pages that are uncommitted - pub(crate) fn finalize_dirty_checksums(&mut self) -> Result { + pub(crate) fn finalize_dirty_checksums(&mut self) -> Result> { let mut root = self.root; if let Some(BtreeHeader { root: ref p, @@ -162,14 +162,14 @@ impl UntypedBtreeMut { { if !self.mem.uncommitted(*p) { // root page is clean - return Ok(()); + return Ok(root); } *checksum = self.finalize_dirty_checksums_helper(*p)?; self.root = root; } - Ok(()) + Ok(root) } fn finalize_dirty_checksums_helper(&mut self, page_number: PageNumber) -> Result { @@ -353,7 +353,7 @@ impl BtreeMut<'_, K, V> { .verify_checksum() } - pub(crate) fn finalize_dirty_checksums(&mut self) -> Result { + pub(crate) fn finalize_dirty_checksums(&mut self) -> Result> { let mut tree = UntypedBtreeMut::new( self.get_root(), self.mem.clone(), @@ -361,9 +361,8 @@ impl BtreeMut<'_, K, V> { K::fixed_width(), V::fixed_width(), ); - tree.finalize_dirty_checksums()?; - self.root = tree.get_root(); - Ok(()) + self.root = tree.finalize_dirty_checksums()?; + Ok(self.root) } #[allow(dead_code)] diff --git a/src/tree_store/table_tree.rs b/src/tree_store/table_tree.rs index fc964563..fdef1ced 100644 --- a/src/tree_store/table_tree.rs +++ b/src/tree_store/table_tree.rs @@ -389,7 +389,7 @@ impl<'txn> TableTreeMut<'txn> { Ok(true) } - pub(crate) fn flush_table_root_updates(&mut self) -> Result> { + pub(crate) fn flush_table_root_updates(&mut self) -> Result<&mut Self> { for (name, (new_root, new_length)) in self.pending_table_updates.drain() { // Bypass .get_table() since the table types are dynamic let mut definition = self.tree.get(&name.as_str())?.unwrap().value(); @@ -418,8 +418,7 @@ impl<'txn> TableTreeMut<'txn> { fixed_key_size, fixed_value_size, ); - tree.finalize_dirty_checksums()?; - *table_root = tree.get_root(); + *table_root = tree.finalize_dirty_checksums()?; *table_length = new_length; } InternalTableDefinition::Multimap { @@ -440,8 +439,11 @@ impl<'txn> TableTreeMut<'txn> { } self.tree.insert(&name.as_str(), &definition)?; } - self.tree.finalize_dirty_checksums()?; - Ok(self.tree.get_root()) + Ok(self) + } + + pub(crate) fn finalize_dirty_checksums(&mut self) -> Result> { + self.tree.finalize_dirty_checksums() } // root_page: the root of the master table From 810ba8c6cbbee8d015136e2b014ee9655eb68b58 Mon Sep 17 00:00:00 2001 From: Michael Constant Date: Wed, 30 Oct 2024 02:48:02 -0700 Subject: [PATCH 6/6] Add set_quick_repair() This adds a new quick-repair mode, which gives instant recovery after a crash at the cost of slower commits. To make this work, each commit with quick-repair enabled needs to save the allocator state somewhere. We can't use the region headers, because we'd be overwriting them in place; we might crash partway through the overwrite, and then we'd need a full repair. So we instead save the allocator state to a new table in the system tree. Writing to the table is slightly tricky, because it needs to be done without allocating (see below), but other than that it's a perfectly ordinary transactional write with all the usual guarantees. The other requirement to avoid full repair is knowing whether the last transaction used 2-phase commit. For this, we add a new two_phase_commit bit to the god byte, which is always updated atomically along with swapping the primary bit. Old redb versions will ignore the new flag when reading and clear it when writing, which is exactly what we want. This turns out to also fix a longstanding bug where 2-phase commit hasn't been providing any security benefit at all. The checksum forgery attack described in the documentation for 1-phase commit actually works equally well against 2-phase commit! The problem is that even though 2-phase commit guarantees the primary is valid, redb ignores the primary flag when repairing. It always picks whichever commit slot is newer, as long as the checksum is valid. So if you crash partway through a commit, it'll try to recover using the partially-written secondary rather than the fully-written primary, regardless of the commit strategy. The fix for this is exactly the two_phase_commit bit described above. After a crash, we check whether the last transaction used 2-phase commit; if so, we only look at the primary (which is guaranteed to be valid) and ignore the secondary. Quick-repair needs this check anyway for safety, so we get the bug fix for free. To write to the allocator state table without allocating, I've introduced a new insert_inplace() function. It's similar to insert_reserve(), but more general and maybe simpler. To use it, you have to first do an ordinary insert() with your desired key and a value of the appropriate length; then later in the same transaction you can call insert_inplace() to replace the value with a new one. Unlike insert_reserve(), this works with values that don't implement MutInPlaceValue, and it lets you hold multiple reservations simultaneously. insert_inplace() could be safely exposed to users, but I don't think there's any reason to. Since it doesn't give you a mutable reference, there's no benefit over insert() unless you're storing data that cares about its own position in the database. So for now it's private, and I haven't bothered making a new error type for it; it just panics if you don't satisfy the preconditions. The fuzzer is perfect for testing quick-repair, because it can simulate a crash, reopen the database (using quick-repair if possible), and then verify that the resulting allocator state exactly matches what would happen if it ran a full repair. I've modified the fuzzer to generate quick-repair commits in addition to ordinary commits. --- docs/design.md | 29 ++- fuzz/fuzz_targets/common.rs | 1 + fuzz/fuzz_targets/fuzz_redb.rs | 1 + src/db.rs | 90 ++++++-- src/transactions.rs | 169 ++++++++++++--- src/tree_store/btree.rs | 17 ++ src/tree_store/btree_mutator.rs | 45 +++- src/tree_store/page_store/header.rs | 54 ++++- src/tree_store/page_store/page_manager.rs | 238 +++++++++++++++++----- src/tree_store/table_tree.rs | 39 ++++ src/tree_store/table_tree_base.rs | 4 + 11 files changed, 588 insertions(+), 99 deletions(-) diff --git a/docs/design.md b/docs/design.md index 882beb6c..2e1b4b16 100644 --- a/docs/design.md +++ b/docs/design.md @@ -88,12 +88,16 @@ controls which transaction pointer is the primary. `magic number` must be set to the ASCII letters 'redb' followed by 0x1A, 0x0A, 0xA9, 0x0D, 0x0A. This sequence is inspired by the PNG magic number. -`god byte`, so named because this byte controls the state of the entire database, is a bitfield containing two flags: +`god byte`, so named because this byte controls the state of the entire database, is a bitfield containing three flags: * first bit: `primary_bit` flag which indicates whether transaction slot 0 or transaction slot 1 contains the latest commit. - redb relies on the fact that this is a single bit to perform atomic commits. -* second bit: `recovery_required` flag, if set then the recovery process must be run when opening the database. - During the recovery process, the region tracker and regional allocator states -- described below -- are reconstructed - by walking the btree from all active roots. +* second bit: `recovery_required` flag, if set then the recovery process must be run when opening the database. This can be + a full repair, in which the region tracker and regional allocator states -- described below -- are reconstructed by walking + the btree from all active roots, or a quick-repair, in which the state is simply loaded from the allocator state table. +* third bit: `two_phase_commit` flag, which indicates whether the transaction in the primary slot was written using 2-phase + commit. If so, the primary slot is guaranteed to be valid, and repair won't look at the secondary slot. This flag is always + updated atomically along with the primary bit. + +redb relies on the fact that this is a single byte to perform atomic commits. `page size` is the size of a redb page in bytes @@ -155,7 +159,9 @@ changed during an upgrade. ### Region tracker The region tracker is an array of `BtreeBitmap`s that tracks the page orders which are free in each region. -It is stored in a page in the data section of a region: +There are two different places it can be stored: on shutdown, it's written to a page in the data section of +a region, and when making a commit with quick-repair enabled, it's written to an entry in the allocator state +table. The former is valid only after a clean shutdown; the latter is usable even after a crash. ``` <-------------------------------------------- 8 bytes -------------------------------------------> ================================================================================================== @@ -216,6 +222,11 @@ range has been allocated * n bytes: free index data * n bytes: allocated data +Like the region tracker, there are two different places where the regional allocator state can be +stored. On shutdown, it's written to the region header as described above, and when making a commit +with quick-repair enabled, it's written to an entry in the allocator state table. The former is valid +only after a clean shutdown; the latter is usable even after a crash. + ``` <-------------------------------------------- 8 bytes -------------------------------------------> ================================================================================================== @@ -461,6 +472,12 @@ To repair the database after an unclean shutdown we must: 2) Update the allocator state, so that it is consistent with all the database roots in the above transaction +If the last commit before the crash had quick-repair enabled, then these are both trivial. The +primary commit slot is guaranteed to be valid, because it was written using 2-phase commit, and +the corresponding allocator state is stored in the allocator state table. + +Otherwise, we need to perform a full repair: + For (1), if the primary commit slot is invalid we switch to the secondary slot. For (2), we rebuild the allocator state by walking the following trees and marking all referenced diff --git a/fuzz/fuzz_targets/common.rs b/fuzz/fuzz_targets/common.rs index faec1d70..ac0d15cb 100644 --- a/fuzz/fuzz_targets/common.rs +++ b/fuzz/fuzz_targets/common.rs @@ -164,6 +164,7 @@ pub(crate) enum FuzzOperation { pub(crate) struct FuzzTransaction { pub ops: Vec, pub durable: bool, + pub quick_repair: bool, pub commit: bool, pub create_ephemeral_savepoint: bool, pub create_persistent_savepoint: bool, diff --git a/fuzz/fuzz_targets/fuzz_redb.rs b/fuzz/fuzz_targets/fuzz_redb.rs index 599784d0..34c56cab 100644 --- a/fuzz/fuzz_targets/fuzz_redb.rs +++ b/fuzz/fuzz_targets/fuzz_redb.rs @@ -583,6 +583,7 @@ fn exec_table_crash_support(config: &FuzzConfig, apply: fn(WriteTransa if !transaction.durable { txn.set_durability(Durability::None); } + txn.set_quick_repair(transaction.quick_repair); let mut counter_table = txn.open_table(COUNTER_TABLE).unwrap(); let uncommitted_id = txn_id as u64 + 1; counter_table.insert((), uncommitted_id)?; diff --git a/src/db.rs b/src/db.rs index e2ba23fa..c0558e5b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -19,7 +19,9 @@ use std::sync::{Arc, Mutex}; use crate::error::TransactionError; use crate::sealed::Sealed; -use crate::transactions::SAVEPOINT_TABLE; +use crate::transactions::{ + AllocatorStateKey, AllocatorStateTree, ALLOCATOR_STATE_TABLE_NAME, SAVEPOINT_TABLE, +}; use crate::tree_store::file_backend::FileBackend; #[cfg(feature = "logging")] use log::{debug, info, warn}; @@ -429,7 +431,9 @@ impl Database { return Err(CompactionError::TransactionInProgress); } // Commit to free up any pending free pages - // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter + // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter. + // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use quick-repair -- that way the user + // can cancel the compaction without requiring a full repair afterwards let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?; if txn.list_persistent_savepoints()?.next().is_some() { return Err(CompactionError::PersistentSavepointExists); @@ -609,6 +613,12 @@ impl Database { repair_callback: &(dyn Fn(&mut RepairSession) + 'static), ) -> Result<[Option; 3], DatabaseError> { if !Self::verify_primary_checksums(mem.clone())? { + if mem.used_two_phase_commit() { + return Err(DatabaseError::Storage(StorageError::Corrupted( + "Primary is corrupted despite 2-phase commit".to_string(), + ))); + } + // 0.3 because the repair takes 3 full scans and the first is done now let mut handle = RepairSession::new(0.3); repair_callback(&mut handle); @@ -701,23 +711,31 @@ impl Database { )?; let mut mem = Arc::new(mem); if mem.needs_repair()? { - #[cfg(feature = "logging")] - warn!("Database {:?} not shutdown cleanly. Repairing", &file_path); - let mut handle = RepairSession::new(0.0); - repair_callback(&mut handle); - if handle.aborted() { - return Err(DatabaseError::RepairAborted); + // If the last transaction used 2-phase commit and updated the allocator state table, then + // we can just load the allocator state from there. Otherwise, we need a full repair + if Self::try_quick_repair(mem.clone())? { + #[cfg(feature = "logging")] + info!("Quick-repair successful, full repair not needed"); + } else { + #[cfg(feature = "logging")] + warn!("Database {:?} not shutdown cleanly. Repairing", &file_path); + let mut handle = RepairSession::new(0.0); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } + let [data_root, system_root, freed_root] = + Self::do_repair(&mut mem, repair_callback)?; + let next_transaction_id = mem.get_last_committed_transaction_id()?.next(); + mem.commit( + data_root, + system_root, + freed_root, + next_transaction_id, + false, + true, + )?; } - let [data_root, system_root, freed_root] = Self::do_repair(&mut mem, repair_callback)?; - let next_transaction_id = mem.get_last_committed_transaction_id()?.next(); - mem.commit( - data_root, - system_root, - freed_root, - next_transaction_id, - false, - true, - )?; } mem.begin_writable()?; @@ -752,6 +770,42 @@ impl Database { Ok(db) } + // Returns true if quick-repair was successful, or false if a full repair is needed + fn try_quick_repair(mem: Arc) -> Result { + // Quick-repair is only possible if the primary was written using 2-phase commit + if !mem.used_two_phase_commit() { + return Ok(false); + } + + // See if the allocator state table is present in the system table tree + let fake_freed_pages = Arc::new(Mutex::new(vec![])); + let system_table_tree = TableTreeMut::new( + mem.get_system_root(), + Arc::new(TransactionGuard::fake()), + mem.clone(), + fake_freed_pages.clone(), + ); + let Some(allocator_state_table) = system_table_tree + .get_table::(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal) + .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))? + else { + return Ok(false); + }; + + // Load the allocator state from the table + let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else { + unreachable!(); + }; + let tree = AllocatorStateTree::new( + table_root, + Arc::new(TransactionGuard::fake()), + mem.clone(), + fake_freed_pages, + ); + + mem.try_load_allocator_state(&tree) + } + fn allocate_read_transaction(&self) -> Result { let id = self .transaction_tracker diff --git a/src/transactions.rs b/src/transactions.rs index 63c9e8e6..c2bbe47b 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -13,8 +13,8 @@ use crate::types::{Key, Value}; use crate::{ AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table, - TableDefinition, TableError, TableHandle, TransactionError, UntypedMultimapTableHandle, - UntypedTableHandle, + TableDefinition, TableError, TableHandle, TransactionError, TypeName, + UntypedMultimapTableHandle, UntypedTableHandle, }; #[cfg(feature = "logging")] use log::{debug, warn}; @@ -23,6 +23,7 @@ use std::cmp::min; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; +use std::mem::size_of; use std::ops::RangeBounds; #[cfg(any(test, fuzzing))] use std::ops::RangeFull; @@ -35,6 +36,70 @@ const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> = SystemTableDefinition::new("next_savepoint_id"); pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition = SystemTableDefinition::new("persistent_savepoints"); +// The allocator state table is stored in the system table tree, but it's accessed using +// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition +pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state"; +pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>; + +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] +pub(crate) enum AllocatorStateKey { + Region(u32), + RegionTracker, + TransactionId, +} + +impl Value for AllocatorStateKey { + type SelfType<'a> = Self; + type AsBytes<'a> = [u8; 1 + size_of::()]; + + fn fixed_width() -> Option { + Some(1 + size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + match data[0] { + 0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())), + 1 => Self::RegionTracker, + 2 => Self::TransactionId, + _ => unreachable!(), + } + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + let mut result = Self::AsBytes::default(); + match value { + Self::Region(region) => { + result[0] = 0; + result[1..].copy_from_slice(&u32::to_le_bytes(*region)); + } + Self::RegionTracker => { + result[0] = 1; + } + Self::TransactionId => { + result[0] = 2; + } + } + + result + } + + fn type_name() -> TypeName { + TypeName::internal("redb::AllocatorStateKey") + } +} + +impl Key for AllocatorStateKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + Self::from_bytes(data1).cmp(&Self::from_bytes(data2)) + } +} pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> { name: &'a str, @@ -423,6 +488,7 @@ pub struct WriteTransaction { dirty: AtomicBool, durability: InternalDurability, two_phase_commit: bool, + quick_repair: bool, // Persistent savepoints created during this transaction created_persistent_savepoints: Mutex>, deleted_persistent_savepoints: Mutex>, @@ -481,6 +547,7 @@ impl WriteTransaction { dirty: AtomicBool::new(false), durability: InternalDurability::Immediate, two_phase_commit: false, + quick_repair: false, created_persistent_savepoints: Mutex::new(Default::default()), deleted_persistent_savepoints: Mutex::new(vec![]), }) @@ -930,6 +997,20 @@ impl WriteTransaction { self.two_phase_commit = enabled; } + /// Enable or disable quick-repair (defaults to disabled) + /// + /// By default, when reopening the database after a crash, redb needs to do a full repair. + /// This involves walking the entire database to verify the checksums and reconstruct the + /// allocator state, so it can be very slow if the database is large. + /// + /// Alternatively, you can enable quick-repair. In this mode, redb saves the allocator state + /// as part of each commit (so it doesn't need to be reconstructed), and enables 2-phase commit + /// (which guarantees that the primary commit slot is valid without needing to look at the + /// checksums). This means commits are slower, but recovery after a crash is almost instant. + pub fn set_quick_repair(&mut self, enabled: bool) { + self.quick_repair = enabled; + } + /// Open the given table /// /// The table will be created if it does not exist @@ -1023,10 +1104,15 @@ impl WriteTransaction { } fn commit_inner(&mut self) -> Result<(), CommitError> { + // Quick-repair requires 2-phase commit + if self.quick_repair { + self.two_phase_commit = true; + } + #[cfg(feature = "logging")] debug!( - "Committing transaction id={:?} with durability={:?} two_phase={}", - self.transaction_id, self.durability, self.two_phase_commit + "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}", + self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair ); match self.durability { InternalDurability::None => self.non_durable_commit()?, @@ -1089,6 +1175,7 @@ impl WriteTransaction { .transaction_tracker .oldest_live_read_transaction() .map_or(self.transaction_id, |x| x.next()); + self.process_freed_pages(free_until_transaction)?; let user_root = self .tables @@ -1098,21 +1185,54 @@ impl WriteTransaction { .flush_table_root_updates()? .finalize_dirty_checksums()?; - let system_root = self - .system_tables - .lock() - .unwrap() - .table_tree - .flush_table_root_updates()? - .finalize_dirty_checksums()?; + let mut system_tables = self.system_tables.lock().unwrap(); + let system_tree = system_tables.table_tree.flush_table_root_updates()?; + system_tree + .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal) + .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?; + + if self.quick_repair { + system_tree.create_table_and_flush_table_root( + ALLOCATOR_STATE_TABLE_NAME, + |tree: &mut AllocatorStateTree| { + let mut pagination_counter = 0; + + loop { + let num_regions = self + .mem + .reserve_allocator_state(tree, self.transaction_id)?; + + // We can't free pages after the commit, because that would invalidate our + // saved allocator state. Everything needs to go through the transactional + // free mechanism + self.store_freed_pages(&mut pagination_counter, true)?; + + if self.mem.try_save_allocator_state(tree, num_regions)? { + return Ok(()); + } + + // Clear out the table before retrying, just in case the number of regions + // has somehow shrunk. Don't use retain_in() for this, since it doesn't + // free the pages immediately -- we need to reuse those pages to guarantee + // that our retry loop will eventually terminate + while let Some(guards) = tree.last()? { + let key = guards.0.value(); + drop(guards); + tree.remove(&key)?; + } + } + }, + )?; + } else { + // If a savepoint exists it might reference the freed-tree, since it holds a reference to the + // root of the freed-tree. Therefore, we must use the transactional free mechanism to free + // those pages. If there are no save points then these can be immediately freed, which is + // done at the end of this function. + let savepoint_exists = self.transaction_tracker.any_savepoint_exists(); + self.store_freed_pages(&mut 0, savepoint_exists)?; + } - self.process_freed_pages(free_until_transaction)?; - // If a savepoint exists it might reference the freed-tree, since it holds a reference to the - // root of the freed-tree. Therefore, we must use the transactional free mechanism to free - // those pages. If there are no save points then these can be immediately freed, which is - // done at the end of this function. - let savepoint_exists = self.transaction_tracker.any_savepoint_exists(); - self.store_freed_pages(savepoint_exists)?; + let system_root = system_tree.finalize_dirty_checksums()?; // Finalize freed table checksums, before doing the final commit let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; @@ -1158,7 +1278,7 @@ impl WriteTransaction { // Store all freed pages for a future commit(), since we can't free pages during a // non-durable commit (it's non-durable, so could be rolled back anytime in the future) - self.store_freed_pages(true)?; + self.store_freed_pages(&mut 0, true)?; // Finalize all checksums, before doing the final commit let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; @@ -1264,10 +1384,13 @@ impl WriteTransaction { Ok(()) } - fn store_freed_pages(&mut self, include_post_commit_free: bool) -> Result { + fn store_freed_pages( + &self, + pagination_counter: &mut u64, + include_post_commit_free: bool, + ) -> Result { assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8 - let mut pagination_counter = 0u64; let mut freed_tree = self.freed_tree.lock().unwrap(); if include_post_commit_free { // Move all the post-commit pages that came from the freed-tree. These need to be stored @@ -1282,7 +1405,7 @@ impl WriteTransaction { let buffer_size = FreedPageList::required_bytes(chunk_size); let key = FreedTableKey { transaction_id: self.transaction_id.raw_id(), - pagination_id: pagination_counter, + pagination_id: *pagination_counter, }; let mut access_guard = freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?; @@ -1295,7 +1418,7 @@ impl WriteTransaction { } drop(access_guard); - pagination_counter += 1; + *pagination_counter += 1; if include_post_commit_free { // Move all the post-commit pages that came from the freed-tree. These need to be stored diff --git a/src/tree_store/btree.rs b/src/tree_store/btree.rs index ac3bc784..541e9e3d 100644 --- a/src/tree_store/btree.rs +++ b/src/tree_store/btree.rs @@ -434,6 +434,23 @@ impl BtreeMut<'_, K, V> { Ok(old_value) } + // Insert without allocating or freeing any pages. This requires that you've previously + // inserted the same key, with a value of at least the same serialized length, earlier + // in the same transaction. If those preconditions aren't satisfied, insert_inplace() + // will panic; it won't allocate under any circumstances + pub(crate) fn insert_inplace( + &mut self, + key: &K::SelfType<'_>, + value: &V::SelfType<'_>, + ) -> Result<()> { + let mut fake_freed_pages = vec![]; + let mut operation = + MutateHelper::::new(&mut self.root, self.mem.clone(), fake_freed_pages.as_mut()); + operation.insert_inplace(key, value)?; + assert!(fake_freed_pages.is_empty()); + Ok(()) + } + pub(crate) fn remove(&mut self, key: &K::SelfType<'_>) -> Result>> { #[cfg(feature = "logging")] trace!("Btree(root={:?}): Deleting {:?}", &self.root, key); diff --git a/src/tree_store/btree_mutator.rs b/src/tree_store/btree_mutator.rs index 7c50335c..3d1790cd 100644 --- a/src/tree_store/btree_mutator.rs +++ b/src/tree_store/btree_mutator.rs @@ -5,7 +5,7 @@ use crate::tree_store::btree_base::{ use crate::tree_store::btree_mutator::DeletionResult::{ DeletedBranch, DeletedLeaf, PartialBranch, PartialLeaf, Subtree, }; -use crate::tree_store::page_store::{Page, PageImpl}; +use crate::tree_store::page_store::{Page, PageImpl, PageMut}; use crate::tree_store::{ AccessGuardMut, BtreeHeader, PageNumber, RawLeafBuilder, TransactionalMemory, }; @@ -496,6 +496,49 @@ impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> { }) } + pub(crate) fn insert_inplace( + &mut self, + key: &K::SelfType<'_>, + value: &V::SelfType<'_>, + ) -> Result<()> { + assert!(self.modify_uncommitted); + let header = self.root.expect("Key not found (tree is empty)"); + self.insert_inplace_helper( + self.mem.get_page_mut(header.root)?, + K::as_bytes(key).as_ref(), + V::as_bytes(value).as_ref(), + )?; + *self.root = Some(BtreeHeader::new(header.root, DEFERRED, header.length)); + Ok(()) + } + + fn insert_inplace_helper(&mut self, mut page: PageMut, key: &[u8], value: &[u8]) -> Result<()> { + assert!(self.mem.uncommitted(page.get_page_number())); + + let node_mem = page.memory(); + match node_mem[0] { + LEAF => { + let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width()); + let (position, found) = accessor.position::(key); + assert!(found); + let old_len = accessor.entry(position).unwrap().value().len(); + assert!(value.len() <= old_len); + let mut mutator = LeafMutator::new(&mut page, K::fixed_width(), V::fixed_width()); + mutator.insert(position, true, key, value); + } + BRANCH => { + let accessor = BranchAccessor::new(&page, K::fixed_width()); + let (child_index, child_page) = accessor.child_for_key::(key); + self.insert_inplace_helper(self.mem.get_page_mut(child_page)?, key, value)?; + let mut mutator = BranchMutator::new(&mut page); + mutator.write_child_page(child_index, child_page, DEFERRED); + } + _ => unreachable!(), + } + + Ok(()) + } + fn delete_leaf_helper( &mut self, page: PageImpl, diff --git a/src/tree_store/page_store/header.rs b/src/tree_store/page_store/header.rs index 81820fd2..351a2db2 100644 --- a/src/tree_store/page_store/header.rs +++ b/src/tree_store/page_store/header.rs @@ -5,7 +5,7 @@ use crate::tree_store::page_store::page_manager::{ xxh3_checksum, FILE_FORMAT_VERSION1, FILE_FORMAT_VERSION2, }; use crate::tree_store::{Checksum, PageNumber}; -use crate::{DatabaseError, StorageError}; +use crate::{DatabaseError, Result, StorageError}; use std::mem::size_of; // Database layout: @@ -58,6 +58,7 @@ pub(super) const DB_HEADER_SIZE: usize = TRANSACTION_1_OFFSET + TRANSACTION_SIZE // God byte flags const PRIMARY_BIT: u8 = 1; const RECOVERY_REQUIRED: u8 = 2; +const TWO_PHASE_COMMIT: u8 = 4; // Structure of each commit slot const VERSION_OFFSET: usize = 0; @@ -95,6 +96,7 @@ pub(super) struct HeaderRepairInfo { pub(super) struct DatabaseHeader { primary_slot: usize, pub(super) recovery_required: bool, + pub(super) two_phase_commit: bool, page_size: u32, region_header_pages: u32, region_max_data_pages: u32, @@ -120,6 +122,7 @@ impl DatabaseHeader { Self { primary_slot: 0, recovery_required: true, + two_phase_commit: false, page_size: layout.full_region_layout().page_size(), region_header_pages: layout.full_region_layout().get_header_pages(), region_max_data_pages: layout.full_region_layout().num_pages(), @@ -194,12 +197,57 @@ impl DatabaseHeader { self.primary_slot ^= 1; } + // Figure out which slot to use as the primary when starting a repair. The repair process might + // still switch to the other slot later, if the tree checksums turn out to be invalid. + // + // Returns true if we picked the original primary, or false if we swapped + pub(super) fn pick_primary_for_repair( + &mut self, + repair_info: HeaderRepairInfo, + ) -> Result { + // If the primary was written using 2-phase commit, it's guaranteed to be valid. Don't look + // at the secondary; even if it happens to have a valid checksum, Durability::Paranoid means + // we can't trust it + if self.two_phase_commit { + if repair_info.primary_corrupted { + return Err(StorageError::Corrupted( + "Primary is corrupted despite 2-phase commit".to_string(), + )); + } + return Ok(true); + } + + // Pick whichever slot is newer, assuming it has a valid checksum. This handles an edge case + // where we crash during fsync(), and the only data that got written to disk was the god byte + // update swapping the primary -- in that case, the primary contains a valid but out-of-date + // transaction, so we need to load from the secondary instead + if repair_info.primary_corrupted { + if repair_info.secondary_corrupted { + return Err(StorageError::Corrupted( + "Both commit slots are corrupted".to_string(), + )); + } + self.swap_primary_slot(); + return Ok(false); + } + + let secondary_newer = + self.secondary_slot().transaction_id > self.primary_slot().transaction_id; + if secondary_newer && !repair_info.secondary_corrupted { + self.swap_primary_slot(); + return Ok(false); + } + + Ok(true) + } + // TODO: consider returning an Err with the repair info pub(super) fn from_bytes(data: &[u8]) -> Result<(Self, HeaderRepairInfo), DatabaseError> { let invalid_magic_number = data[..MAGICNUMBER.len()] != MAGICNUMBER; let primary_slot = usize::from(data[GOD_BYTE_OFFSET] & PRIMARY_BIT != 0); let recovery_required = (data[GOD_BYTE_OFFSET] & RECOVERY_REQUIRED) != 0; + let two_phase_commit = (data[GOD_BYTE_OFFSET] & TWO_PHASE_COMMIT) != 0; let page_size = get_u32(&data[PAGE_SIZE_OFFSET..]); let region_header_pages = get_u32(&data[REGION_HEADER_PAGES_OFFSET..]); let region_max_data_pages = get_u32(&data[REGION_MAX_DATA_PAGES_OFFSET..]); @@ -226,6 +274,7 @@ impl DatabaseHeader { let result = Self { primary_slot, recovery_required, + two_phase_commit, page_size, region_header_pages, region_max_data_pages, @@ -251,6 +300,9 @@ impl DatabaseHeader { if self.recovery_required { result[GOD_BYTE_OFFSET] |= RECOVERY_REQUIRED; } + if self.two_phase_commit { + result[GOD_BYTE_OFFSET] |= TWO_PHASE_COMMIT; + } result[PAGE_SIZE_OFFSET..(PAGE_SIZE_OFFSET + size_of::())] .copy_from_slice(&self.page_size.to_le_bytes()); result[REGION_HEADER_PAGES_OFFSET..(REGION_HEADER_PAGES_OFFSET + size_of::())] diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 80fd6fab..51e0b0aa 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -1,4 +1,5 @@ use crate::transaction_tracker::TransactionId; +use crate::transactions::{AllocatorStateKey, AllocatorStateTree}; use crate::tree_store::btree_base::{BtreeHeader, Checksum}; use crate::tree_store::page_store::base::{PageHint, MAX_PAGE_INDEX}; use crate::tree_store::page_store::buddy_allocator::BuddyAllocator; @@ -11,7 +12,7 @@ use crate::tree_store::{Page, PageNumber}; use crate::StorageBackend; use crate::{DatabaseError, Result, StorageError}; #[cfg(feature = "logging")] -use log::warn; +use log::{debug, warn}; use std::cmp::{max, min}; #[cfg(debug_assertions)] use std::collections::HashMap; @@ -189,6 +190,7 @@ impl TransactionalMemory { ); header.recovery_required = false; + header.two_phase_commit = true; storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() @@ -221,18 +223,7 @@ impl TransactionalMemory { region_max_pages, page_size.try_into().unwrap(), )); - if repair_info.primary_corrupted { - header.swap_primary_slot(); - } else { - // If the secondary is a valid commit, verify that the primary is newer. This handles an edge case where: - // * the primary bit is flipped to the secondary - // * a crash occurs during fsync, such that no other data is written out to the secondary. meaning that it contains a valid, but out of date transaction - let secondary_newer = - header.secondary_slot().transaction_id > header.primary_slot().transaction_id; - if secondary_newer && !repair_info.secondary_corrupted { - header.swap_primary_slot(); - } - } + header.pick_primary_for_repair(repair_info)?; assert!(!repair_info.invalid_magic_number); storage .write(0, DB_HEADER_SIZE, true)? @@ -296,19 +287,8 @@ impl TransactionalMemory { // TODO: Also we should recheck the layout let mut was_clean = true; if header.recovery_required { - if repair_info.primary_corrupted { - header.swap_primary_slot(); + if !header.pick_primary_for_repair(repair_info)? { was_clean = false; - } else { - // If the secondary is a valid commit, verify that the primary is newer. This handles an edge case where: - // * the primary bit is flipped to the secondary - // * a crash occurs during fsync, such that no other data is written out to the secondary. meaning that it contains a valid, but out of date transaction - let secondary_newer = - header.secondary_slot().transaction_id > header.primary_slot().transaction_id; - if secondary_newer && !repair_info.secondary_corrupted { - header.swap_primary_slot(); - was_clean = false; - } } if repair_info.invalid_magic_number { return Err(StorageError::Corrupted("Invalid magic number".to_string()).into()); @@ -339,6 +319,10 @@ impl TransactionalMemory { Ok(self.state.lock().unwrap().header.recovery_required) } + pub(crate) fn used_two_phase_commit(&self) -> bool { + self.state.lock().unwrap().header.two_phase_commit + } + pub(crate) fn allocator_hash(&self) -> u128 { self.state.lock().unwrap().allocators.xxh3_hash() } @@ -412,6 +396,130 @@ impl TransactionalMemory { result } + pub(crate) fn reserve_allocator_state( + &self, + tree: &mut AllocatorStateTree, + transaction_id: TransactionId, + ) -> Result { + let state = self.state.lock().unwrap(); + let layout = state.header.layout(); + let num_regions = layout.num_regions(); + let region_header_len = layout.full_region_layout().get_header_pages() + * layout.full_region_layout().page_size(); + let region_tracker_len = state.allocators.region_tracker.to_vec().len(); + drop(state); + + for i in 0..num_regions { + tree.insert( + &AllocatorStateKey::Region(i), + &vec![0; region_header_len as usize].as_ref(), + )?; + } + + tree.insert( + &AllocatorStateKey::RegionTracker, + &vec![0; region_tracker_len].as_ref(), + )?; + + tree.insert( + &AllocatorStateKey::TransactionId, + &transaction_id.raw_id().to_le_bytes().as_ref(), + )?; + + Ok(num_regions) + } + + // Returns true on success, or false if the number of regions has changed + pub(crate) fn try_save_allocator_state( + &self, + tree: &mut AllocatorStateTree, + num_regions: u32, + ) -> Result { + // Has the number of regions changed since reserve_allocator_state() was called? + if num_regions != self.state.lock().unwrap().header.layout().num_regions() { + return Ok(false); + } + + for i in 0..num_regions { + let region_bytes = + &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec(); + tree.insert_inplace(&AllocatorStateKey::Region(i), ®ion_bytes.as_ref())?; + } + + let region_tracker_bytes = self + .state + .lock() + .unwrap() + .allocators + .region_tracker + .to_vec(); + tree.insert_inplace( + &AllocatorStateKey::RegionTracker, + ®ion_tracker_bytes.as_ref(), + )?; + + Ok(true) + } + + // Returns true on success, or false if the allocator state was stale (in which case we need + // to fall back to a full repair) + pub(crate) fn try_load_allocator_state(&self, tree: &AllocatorStateTree) -> Result { + // See if this is stale allocator state left over from a previous transaction. That won't + // happen during normal operation, since WriteTransaction::commit() always updates the + // allocator state table before calling TransactionalMemory::commit(), but there are also + // a few places where TransactionalMemory::commit() is called directly without using a + // WriteTransaction. When that happens, any existing allocator state table will be left + // in place but is no longer valid. (And even if there were no such calls today, it would + // be an easy mistake to make! So it's good that we check.) + let transaction_id = TransactionId::new(u64::from_le_bytes( + tree.get(&AllocatorStateKey::TransactionId)? + .unwrap() + .value() + .try_into() + .unwrap(), + )); + if transaction_id != self.get_last_committed_transaction_id()? { + #[cfg(feature = "logging")] + debug!("Ignoring stale allocator state from {:?}", transaction_id); + return Ok(false); + } + + // Load the allocator state + let mut region_allocators = vec![]; + for region in + tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))? + { + region_allocators.push(BuddyAllocator::from_bytes(region?.value())); + } + + let region_tracker = RegionTracker::from_page( + tree.get(&AllocatorStateKey::RegionTracker)? + .unwrap() + .value(), + ); + + let mut state = self.state.lock().unwrap(); + state.allocators = Allocators { + region_tracker, + region_allocators, + }; + + // Resize the allocators to match the current file size + let layout = state.header.layout(); + state.allocators.resize_to(layout); + drop(state); + + // Allocate a larger region tracker page if necessary. This also happens automatically on + // shutdown, but we do it here because we want our allocator state to exactly match the + // result of running a full repair + self.ensure_region_tracker_page()?; + + self.state.lock().unwrap().header.recovery_required = false; + self.needs_recovery.store(false, Ordering::Release); + + Ok(true) + } + pub(crate) fn is_allocated(&self, page: PageNumber) -> bool { let state = self.state.lock().unwrap(); let allocator = state.get_region(page.region); @@ -419,6 +527,27 @@ impl TransactionalMemory { allocator.is_allocated(page.page_index, page.page_order) } + // Make sure we have a large enough region-tracker page. This uses allocate_non_transactional(), + // so it should only be called from outside a transaction + fn ensure_region_tracker_page(&self) -> Result { + let state = self.state.lock().unwrap(); + let tracker_len = state.allocators.region_tracker.to_vec().len(); + let mut tracker_page = state.header.region_tracker(); + drop(state); + + if tracker_page.page_size_bytes(self.page_size) < (tracker_len as u64) { + // Allocate a larger tracker page + self.free(tracker_page); + tracker_page = self + .allocate_non_transactional(tracker_len)? + .get_page_number(); + let mut state = self.state.lock().unwrap(); + state.header.set_region_tracker(tracker_page); + } + + Ok(()) + } + // Relocates the region tracker to a lower page, if possible // Returns true if the page was moved pub(crate) fn relocate_region_tracker(&self) -> Result { @@ -430,6 +559,9 @@ impl TransactionalMemory { let old_tracker_page = state.header.region_tracker(); // allocate acquires this lock, so we need to drop it drop(state); + // Allocate the new page. Unlike other region-tracker allocations, this happens inside + // a transaction, so we use an ordinary allocation (which gets committed or rolled back + // along with the rest of the transaction) rather than allocate_non_transactional() let new_page = self.allocate_lowest(region_tracker_size.try_into().unwrap())?; if new_page.get_page_number().is_before(old_tracker_page) { let mut state = self.state.lock().unwrap(); @@ -519,8 +651,10 @@ impl TransactionalMemory { self.storage.flush(eventual)?; } - // Make our new commit the primary + // Make our new commit the primary, and record whether it was a 2-phase commit. + // These two bits need to be written atomically header.swap_primary_slot(); + header.two_phase_commit = two_phase; // Write the new header to disk self.write_header(&header)?; @@ -797,7 +931,12 @@ impl TransactionalMemory { self.allocated_since_commit.lock().unwrap().contains(&page) } - pub(crate) fn allocate_helper(&self, allocation_size: usize, lowest: bool) -> Result { + pub(crate) fn allocate_helper( + &self, + allocation_size: usize, + lowest: bool, + transactional: bool, + ) -> Result { let required_pages = (allocation_size + self.get_page_size() - 1) / self.get_page_size(); let required_order = ceil_log2(required_pages); @@ -825,10 +964,12 @@ impl TransactionalMemory { assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number)); } - self.allocated_since_commit - .lock() - .unwrap() - .insert(page_number); + if transactional { + self.allocated_since_commit + .lock() + .unwrap() + .insert(page_number); + } let address_range = page_number.address_range( self.page_size.into(), @@ -960,11 +1101,17 @@ impl TransactionalMemory { } pub(crate) fn allocate(&self, allocation_size: usize) -> Result { - self.allocate_helper(allocation_size, false) + self.allocate_helper(allocation_size, false, true) } pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result { - self.allocate_helper(allocation_size, true) + self.allocate_helper(allocation_size, true, true) + } + + // Allocate a page not associated with any transaction. The page is immediately considered committed, + // and won't be rolled back if an abort happens. This is only used for the region tracker + fn allocate_non_transactional(&self, allocation_size: usize) -> Result { + self.allocate_helper(allocation_size, false, false) } pub(crate) fn count_allocated_pages(&self) -> Result { @@ -1017,24 +1164,15 @@ impl Drop for TransactionalMemory { warn!("Failure while finalizing non-durable commit. Database may have rolled back"); } } - let mut state = self.state.lock().unwrap(); - let tracker_len = state.allocators.region_tracker.to_vec().len(); - let tracker_page = state.header.region_tracker(); - if tracker_page.page_size_bytes(self.page_size) < (tracker_len as u64) { - drop(state); - // Allocate a larger tracker page - self.free(tracker_page); - if let Ok(tracker_page) = self.allocate(tracker_len) { - state = self.state.lock().unwrap(); - state - .header - .set_region_tracker(tracker_page.get_page_number()); - } else { - #[cfg(feature = "logging")] - warn!("Failure while flushing allocator state. Repair required at restart."); - return; - } + + // Allocate a larger region-tracker page if necessary + if self.ensure_region_tracker_page().is_err() { + #[cfg(feature = "logging")] + warn!("Failure while flushing allocator state. Repair required at restart."); + return; } + + let mut state = self.state.lock().unwrap(); if state .allocators .flush_to( diff --git a/src/tree_store/table_tree.rs b/src/tree_store/table_tree.rs index fdef1ced..0d6d26a1 100644 --- a/src/tree_store/table_tree.rs +++ b/src/tree_store/table_tree.rs @@ -442,6 +442,45 @@ impl<'txn> TableTreeMut<'txn> { Ok(self) } + // Creates a new table, calls the provided closure to insert entries into it, and then + // flushes the table root. The flush is done using insert_inplace(), so it's guaranteed + // that no pages will be allocated or freed after the closure returns + pub(crate) fn create_table_and_flush_table_root( + &mut self, + name: &str, + f: impl FnOnce(&mut BtreeMut) -> Result, + ) -> Result { + assert!(self.pending_table_updates.is_empty()); + assert!(self.tree.get(&name)?.is_none()); + + // Reserve space in the table tree + self.tree.insert( + &name, + &InternalTableDefinition::new::(TableType::Normal, None, 0), + )?; + + // Create an empty table and call the provided closure on it + let mut tree: BtreeMut = BtreeMut::new( + None, + self.guard.clone(), + self.mem.clone(), + self.freed_pages.clone(), + ); + f(&mut tree)?; + + // Finalize the table's checksums + let table_root = tree.finalize_dirty_checksums()?; + let table_length = tree.get_root().map(|x| x.length).unwrap_or_default(); + + // Flush the root to the table tree, without allocating + self.tree.insert_inplace( + &name, + &InternalTableDefinition::new::(TableType::Normal, table_root, table_length), + )?; + + Ok(()) + } + pub(crate) fn finalize_dirty_checksums(&mut self) -> Result> { self.tree.finalize_dirty_checksums() } diff --git a/src/tree_store/table_tree_base.rs b/src/tree_store/table_tree_base.rs index b3f3dbd1..e2bae031 100644 --- a/src/tree_store/table_tree_base.rs +++ b/src/tree_store/table_tree_base.rs @@ -441,6 +441,10 @@ impl Value for InternalTableDefinition { } } + // Be careful if you change this serialization format! The InternalTableDefinition for + // a given table needs to have a consistent serialized length, regardless of the table + // contents, so that create_table_and_flush_table_root() can update the allocator state + // table without causing more allocations fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Vec where Self: 'b,