From f2e1eebecad41b6f17104f8ec16426df720926fe Mon Sep 17 00:00:00 2001 From: Michael Constant Date: Wed, 30 Oct 2024 02:48:02 -0700 Subject: [PATCH] Add new ParanoidPlus durability level, which doesn't require repair To avoid repair, Durability::ParanoidPlus commits need to save the allocator state somewhere. We can't use the region headers, because we'd be overwriting them in place; we might crash partway through the overwrite, and then we'd need repair. So we instead save the allocator state to a new table in the system tree. Writing to the table is slightly tricky, because it needs to be done without allocating (see below), but other than that it's a perfectly ordinary transactional write with all the usual guarantees. The other requirement to avoid repair is knowing whether the last transaction used 2-phase commit. For this, we add a new two_phase_commit bit to the god byte, which is always updated atomically along with swapping the primary bit. Old redb versions will ignore the new flag when reading and clear it when writing, which is exactly what we want. This turns out to also fix a longstanding bug where Durability::Paranoid hasn't been providing any security benefit at all. The checksum forgery attack described in the Durability::Immediate documentation actually works equally well against Durability::Paranoid! The problem is that even though 2-phase commit guarantees the primary is valid, redb ignores the primary flag when repairing. It always picks whichever commit slot is newer, as long as the checksum is valid. So if you crash partway through a commit, it'll try to recover using the partially-written secondary rather than the fully-written primary, regardless of the durability mode. The fix for this is exactly the two_phase_commit bit described above. After a crash, we check whether the last transaction used 2-phase commit; if so, we only look at the primary (which is guaranteed to be valid) and ignore the secondary. Durability::ParanoidPlus needs this check anyway for safety, so we get the Durability::Paranoid bug fix for free. To write to the allocator state table without allocating, I've introduced a new insert_inplace() function. It's similar to insert_reserve(), but more general and maybe simpler. To use it, you have to first do an ordinary insert() with your desired key and a value of the appropriate length; then later in the same transaction you can call insert_inplace() to replace the value with a new one. Unlike insert_reserve(), this works with values that don't implement MutInPlaceValue, and it lets you hold multiple reservations simultaneously. insert_inplace() could be safely exposed to users, but I don't think there's any reason to. Since it doesn't give you a mutable reference, there's no benefit over insert() unless you're storing data that cares about its own position in the database. So for now it's private, and I haven't bothered making a new error type for it; it just panics if you don't satisfy the preconditions. The fuzzer is perfect for testing Durability::ParanoidPlus, because it can simulate a crash, reopen the database (skipping repair if possible), and then verify that the resulting allocator state exactly matches what would happen if it ran a full repair. I've updated the fuzzer to generate Durability::ParanoidPlus commits along with the existing Durability::None and Durability::Immediate. --- docs/design.md | 29 ++- fuzz/fuzz_targets/common.rs | 12 +- fuzz/fuzz_targets/fuzz_redb.rs | 16 +- src/db.rs | 87 ++++++-- src/transactions.rs | 171 ++++++++++++--- src/tree_store/btree.rs | 17 ++ src/tree_store/btree_mutator.rs | 45 +++- src/tree_store/page_store/header.rs | 54 ++++- src/tree_store/page_store/page_manager.rs | 241 +++++++++++++++++----- src/tree_store/table_tree.rs | 39 ++++ src/tree_store/table_tree_base.rs | 4 + 11 files changed, 602 insertions(+), 113 deletions(-) diff --git a/docs/design.md b/docs/design.md index 882beb6c..500d0e45 100644 --- a/docs/design.md +++ b/docs/design.md @@ -88,12 +88,16 @@ controls which transaction pointer is the primary. `magic number` must be set to the ASCII letters 'redb' followed by 0x1A, 0x0A, 0xA9, 0x0D, 0x0A. This sequence is inspired by the PNG magic number. -`god byte`, so named because this byte controls the state of the entire database, is a bitfield containing two flags: +`god byte`, so named because this byte controls the state of the entire database, is a bitfield containing three flags: * first bit: `primary_bit` flag which indicates whether transaction slot 0 or transaction slot 1 contains the latest commit. - redb relies on the fact that this is a single bit to perform atomic commits. -* second bit: `recovery_required` flag, if set then the recovery process must be run when opening the database. - During the recovery process, the region tracker and regional allocator states -- described below -- are reconstructed - by walking the btree from all active roots. +* second bit: `recovery_required` flag, if set then the recovery process must be run when opening the database. This can be + a full repair, in which the region tracker and regional allocator states -- described below -- are reconstructed by walking + the btree from all active roots, or a quick-repair, in which the state is simply loaded from the allocator state table. +* third bit: `two_phase_commit` flag, which indicates whether the transaction in the primary slot was written using 2-phase + commit. If so, the primary slot is guaranteed to be valid, and repair won't look at the secondary slot. This flag is always + updated atomically along with the primary bit. + +redb relies on the fact that this is a single byte to perform atomic commits. `page size` is the size of a redb page in bytes @@ -155,7 +159,9 @@ changed during an upgrade. ### Region tracker The region tracker is an array of `BtreeBitmap`s that tracks the page orders which are free in each region. -It is stored in a page in the data section of a region: +There are two different places it can be stored: on shutdown, it's written to a page in the data section of +a region, and when making a commit with `Durability::ParanoidPlus`, it's written to an entry in the allocator +state table. The former is valid only after a clean shutdown; the latter is usable even after a crash. ``` <-------------------------------------------- 8 bytes -------------------------------------------> ================================================================================================== @@ -216,6 +222,11 @@ range has been allocated * n bytes: free index data * n bytes: allocated data +Like the region tracker, there are two different places where the regional allocator state can be +stored. On shutdown, it's written to the region header as described above, and when making a commit +with `Durability::ParanoidPlus`, it's written to an entry in the allocator state table. The former +is valid only after a clean shutdown; the latter is usable even after a crash. + ``` <-------------------------------------------- 8 bytes -------------------------------------------> ================================================================================================== @@ -456,7 +467,7 @@ exists. Then, (2) will be accomplished by moving all allocations from transactio savepoint into the pending free state. #### Database repair -To repair the database after an unclean shutdown we must: +To do a full repair after an unclean shutdown we must: 1) Update the super header to reference the last fully committed transaction 2) Update the allocator state, so that it is consistent with all the database roots in the above transaction @@ -472,6 +483,10 @@ All pages referenced by a savepoint must be contained in the above, because it i a) referenced directly by the data, system, or freed tree -- i.e. it's a committed page b) it is not referenced, in which case it is in the pending free state and is contained in the freed tree +Alternatively, we might be able to do a quick-repair. This is only possible if the last transaction +used 2-phase commit (so we know the primary slot is valid, without needing to walk the trees to verify +their checksums) and also saved its allocator state to the allocator state tree. + # Assumptions about underlying media redb is designed to be safe even in the event of power failure or on poorly behaved media. Therefore, we make only a few assumptions about the guarantees provided by the underlying filesystem: diff --git a/fuzz/fuzz_targets/common.rs b/fuzz/fuzz_targets/common.rs index faec1d70..190022d8 100644 --- a/fuzz/fuzz_targets/common.rs +++ b/fuzz/fuzz_targets/common.rs @@ -107,6 +107,16 @@ impl Arbitrary<'_> for BoundedUSize { } } +// We don't simulate fsync(), so it's not interesting to fuzz with Durability::Eventual +// or Durability::Paranoid (they're mostly equivalent to Durability::Immediate). But the +// other three levels are all worth testing +#[derive(Arbitrary, Debug, Clone, PartialEq)] +pub(crate) enum FuzzDurability { + None, + Immediate, + ParanoidPlus, +} + #[derive(Arbitrary, Debug, Clone)] pub(crate) enum FuzzOperation { Get { @@ -163,7 +173,7 @@ pub(crate) enum FuzzOperation { #[derive(Arbitrary, Debug, Clone)] pub(crate) struct FuzzTransaction { pub ops: Vec, - pub durable: bool, + pub durability: FuzzDurability, pub commit: bool, pub create_ephemeral_savepoint: bool, pub create_persistent_savepoint: bool, diff --git a/fuzz/fuzz_targets/fuzz_redb.rs b/fuzz/fuzz_targets/fuzz_redb.rs index 599784d0..fb3ee971 100644 --- a/fuzz/fuzz_targets/fuzz_redb.rs +++ b/fuzz/fuzz_targets/fuzz_redb.rs @@ -580,9 +580,11 @@ fn exec_table_crash_support(config: &FuzzConfig, apply: fn(WriteTransa // Disable IO error simulation while we update the transaction counter table let old_countdown = countdown.swap(u64::MAX, Ordering::SeqCst); let mut txn = db.begin_write().unwrap(); - if !transaction.durable { - txn.set_durability(Durability::None); - } + txn.set_durability(match transaction.durability { + FuzzDurability::None => Durability::None, + FuzzDurability::Immediate => Durability::Immediate, + FuzzDurability::ParanoidPlus => Durability::ParanoidPlus, + }); let mut counter_table = txn.open_table(COUNTER_TABLE).unwrap(); let uncommitted_id = txn_id as u64 + 1; counter_table.insert((), uncommitted_id)?; @@ -627,9 +629,9 @@ fn exec_table_crash_support(config: &FuzzConfig, apply: fn(WriteTransa let commit_succeeded = last_committed == uncommitted_id; if commit_succeeded { assert!(transaction.commit); - savepoint_manager.commit(transaction.durable); + savepoint_manager.commit(transaction.durability != FuzzDurability::None); non_durable_reference = uncommitted_reference; - if transaction.durable { + if transaction.durability != FuzzDurability::None { reference = non_durable_reference.clone(); } } else { @@ -747,7 +749,7 @@ fn apply_crashable_transaction_multimap(txn: WriteTransaction, uncommitted_refer } if transaction.commit { - if transaction.durable { + if transaction.durability != FuzzDurability::None { savepoints.gc_persistent_savepoints(&txn)?; } txn.commit()?; @@ -767,7 +769,7 @@ fn apply_crashable_transaction(txn: WriteTransaction, uncommitted_reference: &mu } if transaction.commit { - if transaction.durable { + if transaction.durability != FuzzDurability::None { savepoints.gc_persistent_savepoints(&txn)?; } txn.commit()?; diff --git a/src/db.rs b/src/db.rs index 686184aa..194a2eee 100644 --- a/src/db.rs +++ b/src/db.rs @@ -21,7 +21,9 @@ use std::sync::{Arc, Mutex}; use crate::error::TransactionError; use crate::sealed::Sealed; -use crate::transactions::SAVEPOINT_TABLE; +use crate::transactions::{ + AllocatorStateKey, AllocatorStateTree, ALLOCATOR_STATE_TABLE_NAME, SAVEPOINT_TABLE, +}; use crate::tree_store::file_backend::FileBackend; #[cfg(feature = "logging")] use log::{debug, info, warn}; @@ -431,7 +433,9 @@ impl Database { return Err(CompactionError::TransactionInProgress); } // Commit to free up any pending free pages - // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter + // Use 2-phase commit to avoid any possible security issues. Plus this compaction is going to be so slow that it doesn't matter. + // Once https://github.com/cberner/redb/issues/829 is fixed, we should upgrade this to use Durability::ParanoidPlus instead -- + // that way the user can cancel the compaction without requiring repair afterwards let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?; if txn.list_persistent_savepoints()?.next().is_some() { return Err(CompactionError::PersistentSavepointExists); @@ -611,6 +615,12 @@ impl Database { repair_callback: &(dyn Fn(&mut RepairSession) + 'static), ) -> Result<[Option; 3], DatabaseError> { if !Self::verify_primary_checksums(mem.clone())? { + if mem.used_two_phase_commit() { + return Err(DatabaseError::Storage(StorageError::Corrupted( + "Primary is corrupted despite 2-phase commit".to_string(), + ))); + } + // 0.3 because the repair takes 3 full scans and the first is done now let mut handle = RepairSession::new(0.3); repair_callback(&mut handle); @@ -703,23 +713,28 @@ impl Database { )?; let mut mem = Arc::new(mem); if mem.needs_repair()? { - #[cfg(feature = "logging")] - warn!("Database {:?} not shutdown cleanly. Repairing", &file_path); - let mut handle = RepairSession::new(0.0); - repair_callback(&mut handle); - if handle.aborted() { - return Err(DatabaseError::RepairAborted); + // If the last transaction used 2-phase commit and updated the allocator state table, then + // we can just load the allocator state from there. Otherwise, we need a full repair + if !Self::try_quick_repair(mem.clone())? { + #[cfg(feature = "logging")] + warn!("Database {:?} not shutdown cleanly. Repairing", &file_path); + let mut handle = RepairSession::new(0.0); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } + let [data_root, system_root, freed_root] = + Self::do_repair(&mut mem, repair_callback)?; + let next_transaction_id = mem.get_last_committed_transaction_id()?.next(); + mem.commit( + data_root, + system_root, + freed_root, + next_transaction_id, + false, + true, + )?; } - let [data_root, system_root, freed_root] = Self::do_repair(&mut mem, repair_callback)?; - let next_transaction_id = mem.get_last_committed_transaction_id()?.next(); - mem.commit( - data_root, - system_root, - freed_root, - next_transaction_id, - false, - true, - )?; } mem.begin_writable()?; @@ -754,6 +769,42 @@ impl Database { Ok(db) } + // Returns true if quick-repair was successful, or false if a full repair is needed + fn try_quick_repair(mem: Arc) -> Result { + // Quick-repair is only possible if the primary was written using 2-phase commit + if !mem.used_two_phase_commit() { + return Ok(false); + } + + // See if the allocator state table is present in the system table tree + let fake_freed_pages = Arc::new(Mutex::new(vec![])); + let system_table_tree = TableTreeMut::new( + mem.get_system_root(), + Arc::new(TransactionGuard::fake()), + mem.clone(), + fake_freed_pages.clone(), + ); + let Some(allocator_state_table) = system_table_tree + .get_table::(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal) + .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))? + else { + return Ok(false); + }; + + // Load the allocator state from the table + let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else { + unreachable!(); + }; + let tree = AllocatorStateTree::new( + table_root, + Arc::new(TransactionGuard::fake()), + mem.clone(), + fake_freed_pages, + ); + + mem.try_load_allocator_state(&tree) + } + fn allocate_read_transaction(&self) -> Result { let id = self .transaction_tracker diff --git a/src/transactions.rs b/src/transactions.rs index 19c2336d..99c28c22 100644 --- a/src/transactions.rs +++ b/src/transactions.rs @@ -13,8 +13,8 @@ use crate::types::{Key, Value}; use crate::{ AccessGuard, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table, - TableDefinition, TableError, TableHandle, TransactionError, UntypedMultimapTableHandle, - UntypedTableHandle, + TableDefinition, TableError, TableHandle, TransactionError, TypeName, + UntypedMultimapTableHandle, UntypedTableHandle, }; #[cfg(feature = "logging")] use log::{debug, warn}; @@ -23,6 +23,7 @@ use std::cmp::min; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::marker::PhantomData; +use std::mem::size_of; use std::ops::RangeBounds; #[cfg(any(test, fuzzing))] use std::ops::RangeFull; @@ -35,6 +36,70 @@ const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> = SystemTableDefinition::new("next_savepoint_id"); pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition = SystemTableDefinition::new("persistent_savepoints"); +// The allocator state table is stored in the system table tree, but it's accessed using +// raw btree operations rather than open_system_table(), so there's no SystemTableDefinition +pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state"; +pub(crate) type AllocatorStateTree<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>; + +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] +pub(crate) enum AllocatorStateKey { + Region(u32), + RegionTracker, + TransactionId, +} + +impl Value for AllocatorStateKey { + type SelfType<'a> = Self; + type AsBytes<'a> = [u8; 1 + size_of::()]; + + fn fixed_width() -> Option { + Some(1 + size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + match data[0] { + 0 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())), + 1 => Self::RegionTracker, + 2 => Self::TransactionId, + _ => unreachable!(), + } + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + let mut result = Self::AsBytes::default(); + match value { + Self::Region(region) => { + result[0] = 0; + result[1..].copy_from_slice(&u32::to_le_bytes(*region)); + } + Self::RegionTracker => { + result[0] = 1; + } + Self::TransactionId => { + result[0] = 2; + } + } + + result + } + + fn type_name() -> TypeName { + TypeName::internal("redb::AllocatorStateKey") + } +} + +impl Key for AllocatorStateKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + Self::from_bytes(data1).cmp(&Self::from_bytes(data2)) + } +} pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> { name: &'a str, @@ -159,7 +224,8 @@ pub enum Durability { /// 3. Call `fsync` to ensure all writes have been persisted to disk /// /// When opening the database after a crash, the most recent of the two commit slots with a - /// valid checksum is used. + /// valid checksum is used. Validating the checksum requires recursively checksumming the + /// entire database, so this can be very slow if the database is large. /// /// Security considerations: The checksum used is xxhash, a fast, non-cryptographic hash /// function with close to perfect collision resistance when used with non-malicious input. An @@ -185,12 +251,18 @@ pub enum Durability { /// could cause a transaction to partially commit (some but not all of the data is written). /// This is described in the design doc in futher detail. /// + /// Recovery after a crash is still very slow for large databases, because it needs to read + /// the entire database to reconstruct the allocator state. + /// /// Security considerations: Many hard disk drives and SSDs do not actually guarantee that data /// has been persisted to disk after calling `fsync`. Even with this commit level, an attacker /// with a high degree of control over the database's workload, including the ability to cause /// the database process to crash, can cause the database to crash with the god byte primary bit /// pointing to an invalid commit slot, leaving the database in an invalid, potentially attacker-controlled state. Paranoid, + /// Like [`Durability::Paranoid`], but also saves the allocator state as part of each commit. + /// This means repair is no longer required when opening the database after a crash. + ParanoidPlus, } // Like a Table but only one may be open at a time to avoid possible races @@ -597,7 +669,7 @@ impl WriteTransaction { pub fn persistent_savepoint(&self) -> Result { if !matches!( self.durability, - Durability::Immediate | Durability::Paranoid + Durability::Immediate | Durability::Paranoid | Durability::ParanoidPlus ) { return Err(SavepointError::InvalidSavepoint); } @@ -661,7 +733,7 @@ impl WriteTransaction { pub fn delete_persistent_savepoint(&self, id: u64) -> Result { if !matches!( self.durability, - Durability::Immediate | Durability::Paranoid + Durability::Immediate | Durability::Paranoid | Durability::ParanoidPlus ) { return Err(SavepointError::InvalidSavepoint); } @@ -1007,9 +1079,10 @@ impl WriteTransaction { ); match self.durability { Durability::None => self.non_durable_commit()?, - Durability::Eventual => self.durable_commit(true, false)?, - Durability::Immediate => self.durable_commit(false, false)?, - Durability::Paranoid => self.durable_commit(false, true)?, + Durability::Eventual => self.durable_commit(true, false, false)?, + Durability::Immediate => self.durable_commit(false, false, false)?, + Durability::Paranoid => self.durable_commit(false, true, false)?, + Durability::ParanoidPlus => self.durable_commit(false, true, true)?, } for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() { @@ -1062,11 +1135,17 @@ impl WriteTransaction { Ok(()) } - pub(crate) fn durable_commit(&mut self, eventual: bool, two_phase: bool) -> Result { + pub(crate) fn durable_commit( + &mut self, + eventual: bool, + two_phase: bool, + save_allocator_state: bool, + ) -> Result { let free_until_transaction = self .transaction_tracker .oldest_live_read_transaction() .map_or(self.transaction_id, |x| x.next()); + self.process_freed_pages(free_until_transaction)?; let user_root = self .tables @@ -1076,21 +1155,54 @@ impl WriteTransaction { .flush_table_root_updates()? .finalize_dirty_checksums()?; - let system_root = self - .system_tables - .lock() - .unwrap() - .table_tree - .flush_table_root_updates()? - .finalize_dirty_checksums()?; + let mut system_tables = self.system_tables.lock().unwrap(); + let system_tree = system_tables.table_tree.flush_table_root_updates()?; + system_tree + .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal) + .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?; + + if save_allocator_state { + system_tree.create_table_and_flush_table_root( + ALLOCATOR_STATE_TABLE_NAME, + |tree: &mut AllocatorStateTree| { + let mut pagination_counter = 0; + + loop { + let num_regions = self + .mem + .reserve_allocator_state(tree, self.transaction_id)?; + + // We can't free pages after the commit, because that would invalidate our + // saved allocator state. Everything needs to go through the transactional + // free mechanism + self.store_freed_pages(&mut pagination_counter, true)?; + + if self.mem.try_save_allocator_state(tree, num_regions)? { + return Ok(()); + } + + // Clear out the table before retrying, just in case the number of regions + // has somehow shrunk. Don't use retain_in() for this, since it doesn't + // free the pages immediately -- we need to reuse those pages to guarantee + // that our retry loop will eventually terminate + while let Some(guards) = tree.last()? { + let key = guards.0.value(); + drop(guards); + tree.remove(&key)?; + } + } + }, + )?; + } else { + // If a savepoint exists it might reference the freed-tree, since it holds a reference to the + // root of the freed-tree. Therefore, we must use the transactional free mechanism to free + // those pages. If there are no save points then these can be immediately freed, which is + // done at the end of this function. + let savepoint_exists = self.transaction_tracker.any_savepoint_exists(); + self.store_freed_pages(&mut 0, savepoint_exists)?; + } - self.process_freed_pages(free_until_transaction)?; - // If a savepoint exists it might reference the freed-tree, since it holds a reference to the - // root of the freed-tree. Therefore, we must use the transactional free mechanism to free - // those pages. If there are no save points then these can be immediately freed, which is - // done at the end of this function. - let savepoint_exists = self.transaction_tracker.any_savepoint_exists(); - self.store_freed_pages(savepoint_exists)?; + let system_root = system_tree.finalize_dirty_checksums()?; // Finalize freed table checksums, before doing the final commit let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; @@ -1136,7 +1248,7 @@ impl WriteTransaction { // Store all freed pages for a future commit(), since we can't free pages during a // non-durable commit (it's non-durable, so could be rolled back anytime in the future) - self.store_freed_pages(true)?; + self.store_freed_pages(&mut 0, true)?; // Finalize all checksums, before doing the final commit let freed_root = self.freed_tree.lock().unwrap().finalize_dirty_checksums()?; @@ -1242,10 +1354,13 @@ impl WriteTransaction { Ok(()) } - fn store_freed_pages(&mut self, include_post_commit_free: bool) -> Result { + fn store_freed_pages( + &self, + pagination_counter: &mut u64, + include_post_commit_free: bool, + ) -> Result { assert_eq!(PageNumber::serialized_size(), 8); // We assume below that PageNumber is length 8 - let mut pagination_counter = 0u64; let mut freed_tree = self.freed_tree.lock().unwrap(); if include_post_commit_free { // Move all the post-commit pages that came from the freed-tree. These need to be stored @@ -1260,7 +1375,7 @@ impl WriteTransaction { let buffer_size = FreedPageList::required_bytes(chunk_size); let key = FreedTableKey { transaction_id: self.transaction_id.raw_id(), - pagination_id: pagination_counter, + pagination_id: *pagination_counter, }; let mut access_guard = freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?; @@ -1273,7 +1388,7 @@ impl WriteTransaction { } drop(access_guard); - pagination_counter += 1; + *pagination_counter += 1; if include_post_commit_free { // Move all the post-commit pages that came from the freed-tree. These need to be stored diff --git a/src/tree_store/btree.rs b/src/tree_store/btree.rs index ac3bc784..541e9e3d 100644 --- a/src/tree_store/btree.rs +++ b/src/tree_store/btree.rs @@ -434,6 +434,23 @@ impl BtreeMut<'_, K, V> { Ok(old_value) } + // Insert without allocating or freeing any pages. This requires that you've previously + // inserted the same key, with a value of at least the same serialized length, earlier + // in the same transaction. If those preconditions aren't satisfied, insert_inplace() + // will panic; it won't allocate under any circumstances + pub(crate) fn insert_inplace( + &mut self, + key: &K::SelfType<'_>, + value: &V::SelfType<'_>, + ) -> Result<()> { + let mut fake_freed_pages = vec![]; + let mut operation = + MutateHelper::::new(&mut self.root, self.mem.clone(), fake_freed_pages.as_mut()); + operation.insert_inplace(key, value)?; + assert!(fake_freed_pages.is_empty()); + Ok(()) + } + pub(crate) fn remove(&mut self, key: &K::SelfType<'_>) -> Result>> { #[cfg(feature = "logging")] trace!("Btree(root={:?}): Deleting {:?}", &self.root, key); diff --git a/src/tree_store/btree_mutator.rs b/src/tree_store/btree_mutator.rs index 7c50335c..3d1790cd 100644 --- a/src/tree_store/btree_mutator.rs +++ b/src/tree_store/btree_mutator.rs @@ -5,7 +5,7 @@ use crate::tree_store::btree_base::{ use crate::tree_store::btree_mutator::DeletionResult::{ DeletedBranch, DeletedLeaf, PartialBranch, PartialLeaf, Subtree, }; -use crate::tree_store::page_store::{Page, PageImpl}; +use crate::tree_store::page_store::{Page, PageImpl, PageMut}; use crate::tree_store::{ AccessGuardMut, BtreeHeader, PageNumber, RawLeafBuilder, TransactionalMemory, }; @@ -496,6 +496,49 @@ impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> { }) } + pub(crate) fn insert_inplace( + &mut self, + key: &K::SelfType<'_>, + value: &V::SelfType<'_>, + ) -> Result<()> { + assert!(self.modify_uncommitted); + let header = self.root.expect("Key not found (tree is empty)"); + self.insert_inplace_helper( + self.mem.get_page_mut(header.root)?, + K::as_bytes(key).as_ref(), + V::as_bytes(value).as_ref(), + )?; + *self.root = Some(BtreeHeader::new(header.root, DEFERRED, header.length)); + Ok(()) + } + + fn insert_inplace_helper(&mut self, mut page: PageMut, key: &[u8], value: &[u8]) -> Result<()> { + assert!(self.mem.uncommitted(page.get_page_number())); + + let node_mem = page.memory(); + match node_mem[0] { + LEAF => { + let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), V::fixed_width()); + let (position, found) = accessor.position::(key); + assert!(found); + let old_len = accessor.entry(position).unwrap().value().len(); + assert!(value.len() <= old_len); + let mut mutator = LeafMutator::new(&mut page, K::fixed_width(), V::fixed_width()); + mutator.insert(position, true, key, value); + } + BRANCH => { + let accessor = BranchAccessor::new(&page, K::fixed_width()); + let (child_index, child_page) = accessor.child_for_key::(key); + self.insert_inplace_helper(self.mem.get_page_mut(child_page)?, key, value)?; + let mut mutator = BranchMutator::new(&mut page); + mutator.write_child_page(child_index, child_page, DEFERRED); + } + _ => unreachable!(), + } + + Ok(()) + } + fn delete_leaf_helper( &mut self, page: PageImpl, diff --git a/src/tree_store/page_store/header.rs b/src/tree_store/page_store/header.rs index 81820fd2..351a2db2 100644 --- a/src/tree_store/page_store/header.rs +++ b/src/tree_store/page_store/header.rs @@ -5,7 +5,7 @@ use crate::tree_store::page_store::page_manager::{ xxh3_checksum, FILE_FORMAT_VERSION1, FILE_FORMAT_VERSION2, }; use crate::tree_store::{Checksum, PageNumber}; -use crate::{DatabaseError, StorageError}; +use crate::{DatabaseError, Result, StorageError}; use std::mem::size_of; // Database layout: @@ -58,6 +58,7 @@ pub(super) const DB_HEADER_SIZE: usize = TRANSACTION_1_OFFSET + TRANSACTION_SIZE // God byte flags const PRIMARY_BIT: u8 = 1; const RECOVERY_REQUIRED: u8 = 2; +const TWO_PHASE_COMMIT: u8 = 4; // Structure of each commit slot const VERSION_OFFSET: usize = 0; @@ -95,6 +96,7 @@ pub(super) struct HeaderRepairInfo { pub(super) struct DatabaseHeader { primary_slot: usize, pub(super) recovery_required: bool, + pub(super) two_phase_commit: bool, page_size: u32, region_header_pages: u32, region_max_data_pages: u32, @@ -120,6 +122,7 @@ impl DatabaseHeader { Self { primary_slot: 0, recovery_required: true, + two_phase_commit: false, page_size: layout.full_region_layout().page_size(), region_header_pages: layout.full_region_layout().get_header_pages(), region_max_data_pages: layout.full_region_layout().num_pages(), @@ -194,12 +197,57 @@ impl DatabaseHeader { self.primary_slot ^= 1; } + // Figure out which slot to use as the primary when starting a repair. The repair process might + // still switch to the other slot later, if the tree checksums turn out to be invalid. + // + // Returns true if we picked the original primary, or false if we swapped + pub(super) fn pick_primary_for_repair( + &mut self, + repair_info: HeaderRepairInfo, + ) -> Result { + // If the primary was written using 2-phase commit, it's guaranteed to be valid. Don't look + // at the secondary; even if it happens to have a valid checksum, Durability::Paranoid means + // we can't trust it + if self.two_phase_commit { + if repair_info.primary_corrupted { + return Err(StorageError::Corrupted( + "Primary is corrupted despite 2-phase commit".to_string(), + )); + } + return Ok(true); + } + + // Pick whichever slot is newer, assuming it has a valid checksum. This handles an edge case + // where we crash during fsync(), and the only data that got written to disk was the god byte + // update swapping the primary -- in that case, the primary contains a valid but out-of-date + // transaction, so we need to load from the secondary instead + if repair_info.primary_corrupted { + if repair_info.secondary_corrupted { + return Err(StorageError::Corrupted( + "Both commit slots are corrupted".to_string(), + )); + } + self.swap_primary_slot(); + return Ok(false); + } + + let secondary_newer = + self.secondary_slot().transaction_id > self.primary_slot().transaction_id; + if secondary_newer && !repair_info.secondary_corrupted { + self.swap_primary_slot(); + return Ok(false); + } + + Ok(true) + } + // TODO: consider returning an Err with the repair info pub(super) fn from_bytes(data: &[u8]) -> Result<(Self, HeaderRepairInfo), DatabaseError> { let invalid_magic_number = data[..MAGICNUMBER.len()] != MAGICNUMBER; let primary_slot = usize::from(data[GOD_BYTE_OFFSET] & PRIMARY_BIT != 0); let recovery_required = (data[GOD_BYTE_OFFSET] & RECOVERY_REQUIRED) != 0; + let two_phase_commit = (data[GOD_BYTE_OFFSET] & TWO_PHASE_COMMIT) != 0; let page_size = get_u32(&data[PAGE_SIZE_OFFSET..]); let region_header_pages = get_u32(&data[REGION_HEADER_PAGES_OFFSET..]); let region_max_data_pages = get_u32(&data[REGION_MAX_DATA_PAGES_OFFSET..]); @@ -226,6 +274,7 @@ impl DatabaseHeader { let result = Self { primary_slot, recovery_required, + two_phase_commit, page_size, region_header_pages, region_max_data_pages, @@ -251,6 +300,9 @@ impl DatabaseHeader { if self.recovery_required { result[GOD_BYTE_OFFSET] |= RECOVERY_REQUIRED; } + if self.two_phase_commit { + result[GOD_BYTE_OFFSET] |= TWO_PHASE_COMMIT; + } result[PAGE_SIZE_OFFSET..(PAGE_SIZE_OFFSET + size_of::())] .copy_from_slice(&self.page_size.to_le_bytes()); result[REGION_HEADER_PAGES_OFFSET..(REGION_HEADER_PAGES_OFFSET + size_of::())] diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 80fd6fab..9827760c 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -1,4 +1,5 @@ use crate::transaction_tracker::TransactionId; +use crate::transactions::{AllocatorStateKey, AllocatorStateTree}; use crate::tree_store::btree_base::{BtreeHeader, Checksum}; use crate::tree_store::page_store::base::{PageHint, MAX_PAGE_INDEX}; use crate::tree_store::page_store::buddy_allocator::BuddyAllocator; @@ -11,7 +12,7 @@ use crate::tree_store::{Page, PageNumber}; use crate::StorageBackend; use crate::{DatabaseError, Result, StorageError}; #[cfg(feature = "logging")] -use log::warn; +use log::{debug, info, warn}; use std::cmp::{max, min}; #[cfg(debug_assertions)] use std::collections::HashMap; @@ -189,6 +190,7 @@ impl TransactionalMemory { ); header.recovery_required = false; + header.two_phase_commit = true; storage .write(0, DB_HEADER_SIZE, true)? .mem_mut() @@ -221,18 +223,7 @@ impl TransactionalMemory { region_max_pages, page_size.try_into().unwrap(), )); - if repair_info.primary_corrupted { - header.swap_primary_slot(); - } else { - // If the secondary is a valid commit, verify that the primary is newer. This handles an edge case where: - // * the primary bit is flipped to the secondary - // * a crash occurs during fsync, such that no other data is written out to the secondary. meaning that it contains a valid, but out of date transaction - let secondary_newer = - header.secondary_slot().transaction_id > header.primary_slot().transaction_id; - if secondary_newer && !repair_info.secondary_corrupted { - header.swap_primary_slot(); - } - } + header.pick_primary_for_repair(repair_info)?; assert!(!repair_info.invalid_magic_number); storage .write(0, DB_HEADER_SIZE, true)? @@ -296,19 +287,8 @@ impl TransactionalMemory { // TODO: Also we should recheck the layout let mut was_clean = true; if header.recovery_required { - if repair_info.primary_corrupted { - header.swap_primary_slot(); + if !header.pick_primary_for_repair(repair_info)? { was_clean = false; - } else { - // If the secondary is a valid commit, verify that the primary is newer. This handles an edge case where: - // * the primary bit is flipped to the secondary - // * a crash occurs during fsync, such that no other data is written out to the secondary. meaning that it contains a valid, but out of date transaction - let secondary_newer = - header.secondary_slot().transaction_id > header.primary_slot().transaction_id; - if secondary_newer && !repair_info.secondary_corrupted { - header.swap_primary_slot(); - was_clean = false; - } } if repair_info.invalid_magic_number { return Err(StorageError::Corrupted("Invalid magic number".to_string()).into()); @@ -339,6 +319,10 @@ impl TransactionalMemory { Ok(self.state.lock().unwrap().header.recovery_required) } + pub(crate) fn used_two_phase_commit(&self) -> bool { + self.state.lock().unwrap().header.two_phase_commit + } + pub(crate) fn allocator_hash(&self) -> u128 { self.state.lock().unwrap().allocators.xxh3_hash() } @@ -412,6 +396,133 @@ impl TransactionalMemory { result } + pub(crate) fn reserve_allocator_state( + &self, + tree: &mut AllocatorStateTree, + transaction_id: TransactionId, + ) -> Result { + let state = self.state.lock().unwrap(); + let layout = state.header.layout(); + let num_regions = layout.num_regions(); + let region_header_len = layout.full_region_layout().get_header_pages() + * layout.full_region_layout().page_size(); + let region_tracker_len = state.allocators.region_tracker.to_vec().len(); + drop(state); + + for i in 0..num_regions { + tree.insert( + &AllocatorStateKey::Region(i), + &vec![0; region_header_len as usize].as_ref(), + )?; + } + + tree.insert( + &AllocatorStateKey::RegionTracker, + &vec![0; region_tracker_len].as_ref(), + )?; + + tree.insert( + &AllocatorStateKey::TransactionId, + &transaction_id.raw_id().to_le_bytes().as_ref(), + )?; + + Ok(num_regions) + } + + // Returns true on success, or false if the number of regions has changed + pub(crate) fn try_save_allocator_state( + &self, + tree: &mut AllocatorStateTree, + num_regions: u32, + ) -> Result { + // Has the number of regions changed since reserve_allocator_state() was called? + if num_regions != self.state.lock().unwrap().header.layout().num_regions() { + return Ok(false); + } + + for i in 0..num_regions { + let region_bytes = + &self.state.lock().unwrap().allocators.region_allocators[i as usize].to_vec(); + tree.insert_inplace(&AllocatorStateKey::Region(i), ®ion_bytes.as_ref())?; + } + + let region_tracker_bytes = self + .state + .lock() + .unwrap() + .allocators + .region_tracker + .to_vec(); + tree.insert_inplace( + &AllocatorStateKey::RegionTracker, + ®ion_tracker_bytes.as_ref(), + )?; + + Ok(true) + } + + // Returns true on success, or false if the allocator state was stale (in which case we need + // to fall back to a full repair) + pub(crate) fn try_load_allocator_state(&self, tree: &AllocatorStateTree) -> Result { + // See if this is stale allocator state left over from a previous transaction. That won't + // happen during normal operation, since WriteTransaction::commit() always updates the + // allocator state table before calling TransactionalMemory::commit(), but there are also + // a few places where TransactionalMemory::commit() is called directly without using a + // WriteTransaction. When that happens, any existing allocator state table will be left + // in place but is no longer valid. (And even if there were no such calls today, it would + // be an easy mistake to make! So it's good that we check.) + let transaction_id = TransactionId::new(u64::from_le_bytes( + tree.get(&AllocatorStateKey::TransactionId)? + .unwrap() + .value() + .try_into() + .unwrap(), + )); + if transaction_id != self.get_last_committed_transaction_id()? { + #[cfg(feature = "logging")] + debug!("Ignoring stale allocator state from {:?}", transaction_id); + return Ok(false); + } + + // Load the allocator state + let mut region_allocators = vec![]; + for region in + tree.range(&(AllocatorStateKey::Region(0)..=AllocatorStateKey::Region(u32::MAX)))? + { + region_allocators.push(BuddyAllocator::from_bytes(region?.value())); + } + + let region_tracker = RegionTracker::from_page( + tree.get(&AllocatorStateKey::RegionTracker)? + .unwrap() + .value(), + ); + + let mut state = self.state.lock().unwrap(); + state.allocators = Allocators { + region_tracker, + region_allocators, + }; + + // Resize the allocators to match the current file size + let layout = state.header.layout(); + state.allocators.resize_to(layout); + drop(state); + + // Allocate a larger region tracker page if necessary. This also happens automatically on + // shutdown, but we do it here because we want our allocator state to exactly match the + // result of running a full repair + self.ensure_region_tracker_page()?; + + #[cfg(feature = "logging")] + info!("Successfully loaded allocator state table, repair not needed"); + + self.state.lock().unwrap().header.recovery_required = false; + self.needs_recovery.store(false, Ordering::Release); + + Ok(true) + } + pub(crate) fn is_allocated(&self, page: PageNumber) -> bool { let state = self.state.lock().unwrap(); let allocator = state.get_region(page.region); @@ -419,6 +530,27 @@ impl TransactionalMemory { allocator.is_allocated(page.page_index, page.page_order) } + // Make sure we have a large enough region-tracker page. This uses allocate_non_transactional(), + // so it should only be called from outside a transaction + fn ensure_region_tracker_page(&self) -> Result { + let state = self.state.lock().unwrap(); + let tracker_len = state.allocators.region_tracker.to_vec().len(); + let mut tracker_page = state.header.region_tracker(); + drop(state); + + if tracker_page.page_size_bytes(self.page_size) < (tracker_len as u64) { + // Allocate a larger tracker page + self.free(tracker_page); + tracker_page = self + .allocate_non_transactional(tracker_len)? + .get_page_number(); + let mut state = self.state.lock().unwrap(); + state.header.set_region_tracker(tracker_page); + } + + Ok(()) + } + // Relocates the region tracker to a lower page, if possible // Returns true if the page was moved pub(crate) fn relocate_region_tracker(&self) -> Result { @@ -430,6 +562,9 @@ impl TransactionalMemory { let old_tracker_page = state.header.region_tracker(); // allocate acquires this lock, so we need to drop it drop(state); + // Allocate the new page. Unlike other region-tracker allocations, this happens inside + // a transaction, so we use an ordinary allocation (which gets committed or rolled back + // along with the rest of the transaction) rather than allocate_non_transactional() let new_page = self.allocate_lowest(region_tracker_size.try_into().unwrap())?; if new_page.get_page_number().is_before(old_tracker_page) { let mut state = self.state.lock().unwrap(); @@ -519,8 +654,10 @@ impl TransactionalMemory { self.storage.flush(eventual)?; } - // Make our new commit the primary + // Make our new commit the primary, and record whether it was a 2-phase commit. + // These two bits need to be written atomically header.swap_primary_slot(); + header.two_phase_commit = two_phase; // Write the new header to disk self.write_header(&header)?; @@ -797,7 +934,12 @@ impl TransactionalMemory { self.allocated_since_commit.lock().unwrap().contains(&page) } - pub(crate) fn allocate_helper(&self, allocation_size: usize, lowest: bool) -> Result { + pub(crate) fn allocate_helper( + &self, + allocation_size: usize, + lowest: bool, + transactional: bool, + ) -> Result { let required_pages = (allocation_size + self.get_page_size() - 1) / self.get_page_size(); let required_order = ceil_log2(required_pages); @@ -825,10 +967,12 @@ impl TransactionalMemory { assert!(!self.open_dirty_pages.lock().unwrap().contains(&page_number)); } - self.allocated_since_commit - .lock() - .unwrap() - .insert(page_number); + if transactional { + self.allocated_since_commit + .lock() + .unwrap() + .insert(page_number); + } let address_range = page_number.address_range( self.page_size.into(), @@ -960,11 +1104,17 @@ impl TransactionalMemory { } pub(crate) fn allocate(&self, allocation_size: usize) -> Result { - self.allocate_helper(allocation_size, false) + self.allocate_helper(allocation_size, false, true) } pub(crate) fn allocate_lowest(&self, allocation_size: usize) -> Result { - self.allocate_helper(allocation_size, true) + self.allocate_helper(allocation_size, true, true) + } + + // Allocate a page not associated with any transaction. The page is immediately considered committed, + // and won't be rolled back if an abort happens. This is only used for the region tracker + fn allocate_non_transactional(&self, allocation_size: usize) -> Result { + self.allocate_helper(allocation_size, false, false) } pub(crate) fn count_allocated_pages(&self) -> Result { @@ -1017,24 +1167,15 @@ impl Drop for TransactionalMemory { warn!("Failure while finalizing non-durable commit. Database may have rolled back"); } } - let mut state = self.state.lock().unwrap(); - let tracker_len = state.allocators.region_tracker.to_vec().len(); - let tracker_page = state.header.region_tracker(); - if tracker_page.page_size_bytes(self.page_size) < (tracker_len as u64) { - drop(state); - // Allocate a larger tracker page - self.free(tracker_page); - if let Ok(tracker_page) = self.allocate(tracker_len) { - state = self.state.lock().unwrap(); - state - .header - .set_region_tracker(tracker_page.get_page_number()); - } else { - #[cfg(feature = "logging")] - warn!("Failure while flushing allocator state. Repair required at restart."); - return; - } + + // Allocate a larger region-tracker page if necessary + if self.ensure_region_tracker_page().is_err() { + #[cfg(feature = "logging")] + warn!("Failure while flushing allocator state. Repair required at restart."); + return; } + + let mut state = self.state.lock().unwrap(); if state .allocators .flush_to( diff --git a/src/tree_store/table_tree.rs b/src/tree_store/table_tree.rs index fdef1ced..0d6d26a1 100644 --- a/src/tree_store/table_tree.rs +++ b/src/tree_store/table_tree.rs @@ -442,6 +442,45 @@ impl<'txn> TableTreeMut<'txn> { Ok(self) } + // Creates a new table, calls the provided closure to insert entries into it, and then + // flushes the table root. The flush is done using insert_inplace(), so it's guaranteed + // that no pages will be allocated or freed after the closure returns + pub(crate) fn create_table_and_flush_table_root( + &mut self, + name: &str, + f: impl FnOnce(&mut BtreeMut) -> Result, + ) -> Result { + assert!(self.pending_table_updates.is_empty()); + assert!(self.tree.get(&name)?.is_none()); + + // Reserve space in the table tree + self.tree.insert( + &name, + &InternalTableDefinition::new::(TableType::Normal, None, 0), + )?; + + // Create an empty table and call the provided closure on it + let mut tree: BtreeMut = BtreeMut::new( + None, + self.guard.clone(), + self.mem.clone(), + self.freed_pages.clone(), + ); + f(&mut tree)?; + + // Finalize the table's checksums + let table_root = tree.finalize_dirty_checksums()?; + let table_length = tree.get_root().map(|x| x.length).unwrap_or_default(); + + // Flush the root to the table tree, without allocating + self.tree.insert_inplace( + &name, + &InternalTableDefinition::new::(TableType::Normal, table_root, table_length), + )?; + + Ok(()) + } + pub(crate) fn finalize_dirty_checksums(&mut self) -> Result> { self.tree.finalize_dirty_checksums() } diff --git a/src/tree_store/table_tree_base.rs b/src/tree_store/table_tree_base.rs index b3f3dbd1..e2bae031 100644 --- a/src/tree_store/table_tree_base.rs +++ b/src/tree_store/table_tree_base.rs @@ -441,6 +441,10 @@ impl Value for InternalTableDefinition { } } + // Be careful if you change this serialization format! The InternalTableDefinition for + // a given table needs to have a consistent serialized length, regardless of the table + // contents, so that create_table_and_flush_table_root() can update the allocator state + // table without causing more allocations fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Vec where Self: 'b,